diff --git a/package/debian/veilid-server/DEBIAN/conffiles b/package/debian/veilid-server/DEBIAN/conffiles new file mode 100644 index 00000000..b9fff798 --- /dev/null +++ b/package/debian/veilid-server/DEBIAN/conffiles @@ -0,0 +1 @@ +/etc/veilid-server/veilid-server.conf diff --git a/package/debian/veilid-server/DEBIAN/postrm b/package/debian/veilid-server/DEBIAN/postrm index b7401ae4..26de0e1c 100755 --- a/package/debian/veilid-server/DEBIAN/postrm +++ b/package/debian/veilid-server/DEBIAN/postrm @@ -1,4 +1,5 @@ #!/bin/sh + set -e if [ -d /run/systemd/system ]; then diff --git a/scripts/local-test.yml b/scripts/local-test.yml index 07f9fff8..1e8a7258 100644 --- a/scripts/local-test.yml +++ b/scripts/local-test.yml @@ -4,3 +4,4 @@ core: dht: min_peer_count: 1 enable_local_peer_scope: true + bootstrap: [] diff --git a/veilid-core/src/connection_manager.rs b/veilid-core/src/connection_manager.rs index 8536b753..7ba16c16 100644 --- a/veilid-core/src/connection_manager.rs +++ b/veilid-core/src/connection_manager.rs @@ -97,6 +97,7 @@ impl ConnectionManager { inner: &mut ConnectionManagerInner, conn: NetworkConnection, ) -> Result<(), String> { + log_net!("on_new_connection_internal: {:?}", conn); let tx = inner .connection_add_channel_tx .as_ref() diff --git a/veilid-core/src/intf/native/network/mod.rs b/veilid-core/src/intf/native/network/mod.rs index c1d5ac50..cdfff354 100644 --- a/veilid-core/src/intf/native/network/mod.rs +++ b/veilid-core/src/intf/native/network/mod.rs @@ -316,6 +316,11 @@ impl Network { &peer_socket_addr, &descriptor.local.map(|sa| sa.to_socket_addr()), ) { + log_net!( + "send_data_to_existing_connection connectionless to {:?}", + descriptor + ); + ph.clone() .send_message(data, peer_socket_addr) .await @@ -334,6 +339,8 @@ impl Network { // Try to send to the exact existing connection if one exists if let Some(conn) = self.connection_manager().get_connection(descriptor).await { + log_net!("send_data_to_existing_connection to {:?}", descriptor); + // connection exists, send over it conn.send(data).await.map_err(logthru_net!())?; diff --git a/veilid-core/src/intf/native/network/network_tcp.rs b/veilid-core/src/intf/native/network/network_tcp.rs index 50f47538..9f153f5e 100644 --- a/veilid-core/src/intf/native/network/network_tcp.rs +++ b/veilid-core/src/intf/native/network/network_tcp.rs @@ -77,10 +77,15 @@ impl Network { protocol_handlers: &[Box], ) -> Result, String> { for ah in protocol_handlers.iter() { - if let Some(nc) = ah.on_accept(stream.clone(), addr).await? { + if let Some(nc) = ah + .on_accept(stream.clone(), addr) + .await + .map_err(logthru_net!())? + { return Ok(Some(nc)); } } + Ok(None) } @@ -105,7 +110,7 @@ impl Network { let std_listener: std::net::TcpListener = socket.into(); let listener = TcpListener::from(std_listener); - trace!("spawn_socket_listener: binding successful to {}", addr); + debug!("spawn_socket_listener: binding successful to {}", addr); // Create protocol handler records let listener_state = Arc::new(RwLock::new(ListenerState::new())); @@ -140,7 +145,7 @@ impl Network { }; // XXX limiting - trace!("TCP connection from: {}", addr); + log_net!("TCP connection from: {}", addr); // Create a stream we can peek on let ps = AsyncPeekStream::new(tcp_stream); @@ -166,6 +171,7 @@ impl Network { // Check is this could be TLS let ls = listener_state.read().clone(); + let conn = if ls.tls_acceptor.is_some() && first_packet[0] == 0x16 { this.try_tls_handlers( ls.tls_acceptor.as_ref().unwrap(), @@ -178,28 +184,34 @@ impl Network { } else { this.try_handlers(ps, addr, &ls.protocol_handlers).await }; + let conn = match conn { - Ok(Some(c)) => c, + Ok(Some(c)) => { + log_net!("protocol handler found for {:?}: {:?}", addr, c); + c + } Ok(None) => { // No protocol handlers matched? drop it. + log_net!(warn "no protocol handler for connection from {:?}", addr); return; } - Err(_) => { + Err(e) => { // Failed to negotiate connection? drop it. + log_net!(warn "failed to negotiate connection from {:?}: {}", addr, e); return; } }; // Register the new connection in the connection manager if let Err(e) = connection_manager.on_new_connection(conn).await { - error!("failed to register new connection: {}", e); + log_net!(error "failed to register new connection: {}", e); } }) .await; - trace!("exited incoming loop for {}", addr); + log_net!(debug "exited incoming loop for {}", addr); // Remove our listener state from this address if we're stopping this.inner.lock().listener_states.remove(&addr); - trace!("listener state removed for {}", addr); + log_net!(debug "listener state removed for {}", addr); // If this happened our low-level listener socket probably died // so it's time to restart the network diff --git a/veilid-core/src/intf/native/network/protocol/sockets.rs b/veilid-core/src/intf/native/network/protocol/sockets.rs index ac1e18db..a9c098e0 100644 --- a/veilid-core/src/intf/native/network/protocol/sockets.rs +++ b/veilid-core/src/intf/native/network/protocol/sockets.rs @@ -100,7 +100,7 @@ pub fn new_unbound_shared_tcp_socket(domain: Domain) -> Result { let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP)) .map_err(map_to_string) .map_err(logthru_net!("failed to create TCP socket"))?; - if let Err(e) = socket.set_linger(None) { + if let Err(e) = socket.set_linger(Some(core::time::Duration::from_secs(0))) { log_net!(error "Couldn't set TCP linger: {}", e); } if let Err(e) = socket.set_nodelay(true) { @@ -144,7 +144,7 @@ pub fn new_bound_first_tcp_socket(local_address: SocketAddr) -> Result Result, String> { + log_net!("TCP: on_accept_async: enter"); let mut peekbuf: [u8; PEEK_DETECT_LEN] = [0u8; PEEK_DETECT_LEN]; let peeklen = stream .peek(&mut peekbuf) @@ -125,7 +126,7 @@ impl RawTcpProtocolHandler { ProtocolNetworkConnection::RawTcp(RawTcpNetworkConnection::new(stream)), ); - log_net!("on_accept_async from: {}", socket_addr); + log_net!(debug "TCP: on_accept_async from: {}", socket_addr); Ok(Some(conn)) } diff --git a/veilid-core/src/intf/native/network/protocol/ws.rs b/veilid-core/src/intf/native/network/protocol/ws.rs index fcd348f8..d41ba79d 100644 --- a/veilid-core/src/intf/native/network/protocol/ws.rs +++ b/veilid-core/src/intf/native/network/protocol/ws.rs @@ -127,7 +127,9 @@ impl WebsocketProtocolHandler { ps: AsyncPeekStream, socket_addr: SocketAddr, ) -> Result, String> { + log_net!("WS: on_accept_async: enter"); let request_path_len = self.arc.request_path.len() + 2; + let mut peekbuf: Vec = vec![0u8; request_path_len]; match io::timeout( Duration::from_micros(self.arc.connection_initial_timeout), @@ -143,6 +145,7 @@ impl WebsocketProtocolHandler { return Err(e).map_err(map_to_string).map_err(logthru_net!(error)); } } + // Check for websocket path let matches_path = &peekbuf[0..request_path_len - 2] == self.arc.request_path.as_slice() && (peekbuf[request_path_len - 2] == b' ' @@ -150,14 +153,10 @@ impl WebsocketProtocolHandler { && peekbuf[request_path_len - 1] == b' ')); if !matches_path { - log_net!( - "not websocket: request_path: {} peekbuf:{}", - std::str::from_utf8(&self.arc.request_path).unwrap(), - std::str::from_utf8(&peekbuf).unwrap() - ); + log_net!("WS: not websocket"); return Ok(None); } - log_net!("found websocket"); + log_net!("WS: found websocket"); let ws_stream = accept_async(ps) .await @@ -182,6 +181,8 @@ impl WebsocketProtocolHandler { ProtocolNetworkConnection::WsAccepted(WebsocketNetworkConnection::new(ws_stream)), ); + log_net!(debug "{}: on_accept_async from: {}", if self.arc.tls { "WSS" } else { "WS" }, socket_addr); + Ok(Some(conn)) } diff --git a/veilid-core/src/intf/native/utils/network_interfaces/mod.rs b/veilid-core/src/intf/native/utils/network_interfaces/mod.rs index 697bb704..2932efe7 100644 --- a/veilid-core/src/intf/native/utils/network_interfaces/mod.rs +++ b/veilid-core/src/intf/native/utils/network_interfaces/mod.rs @@ -345,7 +345,11 @@ impl NetworkInterfaces { if changed { self.cache_best_addresses(); - trace!("NetworkInterfaces refreshed: {:#?}?", self); + //trace!("NetworkInterfaces refreshed: {:#?}?", self); + trace!( + "NetworkInterfaces refreshed: {:#?}?", + self.interface_address_cache + ); } Ok(changed) } diff --git a/veilid-core/src/network_manager.rs b/veilid-core/src/network_manager.rs index 5007686a..ca275e23 100644 --- a/veilid-core/src/network_manager.rs +++ b/veilid-core/src/network_manager.rs @@ -66,10 +66,11 @@ impl Default for NetworkManagerStats { } struct ClientWhitelistEntry { - last_seen: u64, + last_seen_ts: u64, } // Mechanism required to contact another node +#[derive(Clone, Debug)] enum ContactMethod { Unreachable, // Node is not reachable by any means Direct(DialInfo), // Contact the node directly @@ -294,11 +295,11 @@ impl NetworkManager { let mut inner = self.inner.lock(); match inner.client_whitelist.entry(client) { hashlink::lru_cache::Entry::Occupied(mut entry) => { - entry.get_mut().last_seen = intf::get_timestamp() + entry.get_mut().last_seen_ts = intf::get_timestamp() } hashlink::lru_cache::Entry::Vacant(entry) => { entry.insert(ClientWhitelistEntry { - last_seen: intf::get_timestamp(), + last_seen_ts: intf::get_timestamp(), }); } } @@ -309,7 +310,7 @@ impl NetworkManager { match inner.client_whitelist.entry(client) { hashlink::lru_cache::Entry::Occupied(mut entry) => { - entry.get_mut().last_seen = intf::get_timestamp(); + entry.get_mut().last_seen_ts = intf::get_timestamp(); true } hashlink::lru_cache::Entry::Vacant(_) => false, @@ -324,7 +325,7 @@ impl NetworkManager { while inner .client_whitelist .peek_lru() - .map(|v| v.1.last_seen < cutoff_timestamp) + .map(|v| v.1.last_seen_ts < cutoff_timestamp) .unwrap_or_default() { inner.client_whitelist.remove_lru(); @@ -441,7 +442,7 @@ impl NetworkManager { &self, expiration_us: u64, extra_data: D, - ) -> Result<(Vec, EventualValueCloneFuture), String> { + ) -> Result<(Vec, EventualValueFuture), String> { let receipt_manager = self.receipt_manager(); let routing_table = self.routing_table(); @@ -454,7 +455,7 @@ impl NetworkManager { // Record the receipt for later let exp_ts = intf::get_timestamp() + expiration_us; - let eventual = SingleShotEventual::new(ReceiptEvent::Cancelled); + let eventual = SingleShotEventual::new(Some(ReceiptEvent::Cancelled)); let instance = eventual.instance(); receipt_manager.record_single_shot_receipt(receipt, exp_ts, eventual); @@ -761,11 +762,12 @@ impl NetworkManager { return Ok(ContactMethod::OutboundRelay(relay_node)); } // Otherwise, we can't reach this node - debug!( - "unable to reach node {:?}: {}", - target_node_ref, - target_node_ref.operate(|e| format!("{:#?}", e)) - ); + debug!("unable to reach node {:?}", target_node_ref); + // trace!( + // "unable to reach node {:?}: {}", + // target_node_ref, + // target_node_ref.operate(|e| format!("{:#?}", e)) + // ); Ok(ContactMethod::Unreachable) } @@ -797,9 +799,8 @@ impl NetworkManager { .await .map_err(logthru_net!("failed to send signal to {:?}", relay_nr)) .map_err(map_to_string)?; - // Wait for the return receipt - let inbound_nr = match eventual_value.await { + let inbound_nr = match eventual_value.await.take_value().unwrap() { ReceiptEvent::Returned(inbound_nr) => inbound_nr, ReceiptEvent::Expired => { return Err(format!( @@ -888,7 +889,7 @@ impl NetworkManager { .map_err(map_to_string)?; // Wait for the return receipt - let inbound_nr = match eventual_value.await { + let inbound_nr = match eventual_value.await.take_value().unwrap() { ReceiptEvent::Returned(inbound_nr) => inbound_nr, ReceiptEvent::Expired => { return Err(format!("hole punch receipt expired from {:?}", target_nr)); @@ -957,8 +958,13 @@ impl NetworkManager { data }; + log_net!("send_data via dialinfo to {:?}", node_ref); // If we don't have last_connection, try to reach out to the peer via its dial info - match this.get_contact_method(node_ref).map_err(logthru_net!())? { + match this + .get_contact_method(node_ref.clone()) + .map_err(logthru_net!(debug)) + .map(logthru_net!("get_contact_method for {:?}", node_ref))? + { ContactMethod::OutboundRelay(relay_nr) | ContactMethod::InboundRelay(relay_nr) => { this.send_data(relay_nr, data) .await @@ -985,7 +991,7 @@ impl NetworkManager { .map(|_| SendDataKind::GlobalDirect), ContactMethod::Unreachable => Err("Can't send to this node".to_owned()), } - .map_err(logthru_net!()) + .map_err(logthru_net!(debug)) }) } @@ -1122,7 +1128,7 @@ impl NetworkManager { // Keep relays assigned and accessible async fn relay_management_task_routine(self, _last_ts: u64, cur_ts: u64) -> Result<(), String> { - log_net!("--- network manager relay_management task"); + // log_net!("--- network manager relay_management task"); // Get our node's current node info and network class and do the right thing let routing_table = self.routing_table(); @@ -1174,7 +1180,7 @@ impl NetworkManager { // Compute transfer statistics for the low level network async fn rolling_transfers_task_routine(self, last_ts: u64, cur_ts: u64) -> Result<(), String> { - log_net!("--- network manager rolling_transfers task"); + // log_net!("--- network manager rolling_transfers task"); { let inner = &mut *self.inner.lock(); diff --git a/veilid-core/src/routing_table/bucket_entry.rs b/veilid-core/src/routing_table/bucket_entry.rs index 3d16fb7b..f2731f18 100644 --- a/veilid-core/src/routing_table/bucket_entry.rs +++ b/veilid-core/src/routing_table/bucket_entry.rs @@ -23,6 +23,9 @@ const UNRELIABLE_PING_INTERVAL_SECS: u32 = 5; // remains valid, as well as to make sure we remain in any relay node's routing table const KEEPALIVE_PING_INTERVAL_SECS: u32 = 20; +// How many times do we try to ping a never-reached node before we call it dead +const NEVER_REACHED_PING_COUNT: u32 = 3; + // Do not change order here, it will mess up other sorts #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] pub enum BucketEntryState { @@ -58,7 +61,6 @@ impl BucketEntry { transfer_stats_accounting: TransferStatsAccounting::new(), peer_stats: PeerStats { time_added: now, - last_seen: None, rpc_stats: RPCStats::default(), latency: None, transfer: TransferStatsDownUp::default(), @@ -129,7 +131,7 @@ impl BucketEntry { pub fn has_valid_signed_node_info(&self) -> bool { if let Some(sni) = &self.opt_signed_node_info { - sni.signature.valid + sni.is_valid() } else { false } @@ -213,8 +215,13 @@ impl BucketEntry { ///// state machine handling pub(super) fn check_reliable(&self, cur_ts: u64) -> bool { - // if we have had consecutive ping replies for longer that UNRELIABLE_PING_SPAN_SECS - match self.peer_stats.rpc_stats.first_consecutive_answer_time { + // If we have had any failures to send, this is not reliable + if self.peer_stats.rpc_stats.failed_to_send > 0 { + return false; + } + + // if we have seen the node consistently for longer that UNRELIABLE_PING_SPAN_SECS + match self.peer_stats.rpc_stats.first_consecutive_seen_ts { None => false, Some(ts) => { cur_ts.saturating_sub(ts) >= (UNRELIABLE_PING_SPAN_SECS as u64 * 1000000u64) @@ -222,10 +229,15 @@ impl BucketEntry { } } pub(super) fn check_dead(&self, cur_ts: u64) -> bool { + // If we have failured to send NEVER_REACHED_PING_COUNT times in a row, the node is dead + if self.peer_stats.rpc_stats.failed_to_send >= NEVER_REACHED_PING_COUNT { + return true; + } // if we have not heard from the node at all for the duration of the unreliable ping span - // a node is not dead if we haven't heard from it yet - match self.peer_stats.last_seen { - None => false, + // a node is not dead if we haven't heard from it yet, + // but we give it NEVER_REACHED_PING_COUNT chances to ping before we say it's dead + match self.peer_stats.rpc_stats.last_seen_ts { + None => self.peer_stats.rpc_stats.recent_lost_answers < NEVER_REACHED_PING_COUNT, Some(ts) => { cur_ts.saturating_sub(ts) >= (UNRELIABLE_PING_SPAN_SECS as u64 * 1000000u64) } @@ -233,9 +245,20 @@ impl BucketEntry { } fn needs_constant_ping(&self, cur_ts: u64, interval: u64) -> bool { - match self.peer_stats.last_seen { + // If we have not either seen the node, nor asked it a question in the last 'interval' + // then we should ping it + let latest_contact_time = self + .peer_stats + .rpc_stats + .last_seen_ts + .max(self.peer_stats.rpc_stats.last_question); + + match latest_contact_time { None => true, - Some(last_seen) => cur_ts.saturating_sub(last_seen) >= (interval * 1000000u64), + Some(latest_contact_time) => { + // If we haven't done anything with this node in 'interval' seconds + cur_ts.saturating_sub(latest_contact_time) >= (interval * 1000000u64) + } } } @@ -259,19 +282,26 @@ impl BucketEntry { match state { BucketEntryState::Reliable => { // If we are in a reliable state, we need a ping on an exponential scale - match self.peer_stats.last_seen { - None => true, - Some(last_seen) => { - let first_consecutive_answer_time = self - .peer_stats - .rpc_stats - .first_consecutive_answer_time - .unwrap(); - let start_of_reliable_time = first_consecutive_answer_time + let latest_contact_time = self + .peer_stats + .rpc_stats + .last_seen_ts + .max(self.peer_stats.rpc_stats.last_question); + + match latest_contact_time { + None => { + error!("Peer is reliable, but not seen!"); + true + } + Some(latest_contact_time) => { + let first_consecutive_seen_ts = + self.peer_stats.rpc_stats.first_consecutive_seen_ts.unwrap(); + let start_of_reliable_time = first_consecutive_seen_ts + ((UNRELIABLE_PING_SPAN_SECS - UNRELIABLE_PING_INTERVAL_SECS) as u64 * 1_000_000u64); let reliable_cur = cur_ts.saturating_sub(start_of_reliable_time); - let reliable_last = last_seen.saturating_sub(start_of_reliable_time); + let reliable_last = + latest_contact_time.saturating_sub(start_of_reliable_time); retry_falloff_log( reliable_last, @@ -292,37 +322,44 @@ impl BucketEntry { } pub(super) fn touch_last_seen(&mut self, ts: u64) { - // If we've heard from the node at all, we can always restart our lost ping count - self.peer_stats.rpc_stats.recent_lost_answers = 0; // Mark the node as seen - self.peer_stats.last_seen = Some(ts); + if self + .peer_stats + .rpc_stats + .first_consecutive_seen_ts + .is_none() + { + self.peer_stats.rpc_stats.first_consecutive_seen_ts = Some(ts); + } + + self.peer_stats.rpc_stats.last_seen_ts = Some(ts); } pub(super) fn state_debug_info(&self, cur_ts: u64) -> String { - let first_consecutive_answer_time = if let Some(first_consecutive_answer_time) = - self.peer_stats.rpc_stats.first_consecutive_answer_time + let first_consecutive_seen_ts = if let Some(first_consecutive_seen_ts) = + self.peer_stats.rpc_stats.first_consecutive_seen_ts { format!( "{}s ago", - timestamp_to_secs(cur_ts.saturating_sub(first_consecutive_answer_time)) + timestamp_to_secs(cur_ts.saturating_sub(first_consecutive_seen_ts)) ) } else { "never".to_owned() }; - let last_seen = if let Some(last_seen) = self.peer_stats.last_seen { + let last_seen_ts_str = if let Some(last_seen_ts) = self.peer_stats.rpc_stats.last_seen_ts { format!( "{}s ago", - timestamp_to_secs(cur_ts.saturating_sub(last_seen)) + timestamp_to_secs(cur_ts.saturating_sub(last_seen_ts)) ) } else { "never".to_owned() }; format!( - "state: {:?}, first_consecutive_answer_time: {}, last_seen: {}", + "state: {:?}, first_consecutive_seen_ts: {}, last_seen_ts: {}", self.state(cur_ts), - first_consecutive_answer_time, - last_seen + first_consecutive_seen_ts, + last_seen_ts_str ) } @@ -332,11 +369,10 @@ impl BucketEntry { pub(super) fn question_sent(&mut self, ts: u64, bytes: u64, expects_answer: bool) { self.transfer_stats_accounting.add_up(bytes); self.peer_stats.rpc_stats.messages_sent += 1; + self.peer_stats.rpc_stats.failed_to_send = 0; if expects_answer { self.peer_stats.rpc_stats.questions_in_flight += 1; - } - if self.peer_stats.last_seen.is_none() { - self.peer_stats.last_seen = Some(ts); + self.peer_stats.rpc_stats.last_question = Some(ts); } } pub(super) fn question_rcvd(&mut self, ts: u64, bytes: u64) { @@ -344,33 +380,40 @@ impl BucketEntry { self.peer_stats.rpc_stats.messages_rcvd += 1; self.touch_last_seen(ts); } - pub(super) fn answer_sent(&mut self, _ts: u64, bytes: u64) { + pub(super) fn answer_sent(&mut self, bytes: u64) { self.transfer_stats_accounting.add_up(bytes); self.peer_stats.rpc_stats.messages_sent += 1; + self.peer_stats.rpc_stats.failed_to_send = 0; } pub(super) fn answer_rcvd(&mut self, send_ts: u64, recv_ts: u64, bytes: u64) { self.transfer_stats_accounting.add_down(bytes); self.peer_stats.rpc_stats.messages_rcvd += 1; self.peer_stats.rpc_stats.questions_in_flight -= 1; - if self - .peer_stats - .rpc_stats - .first_consecutive_answer_time - .is_none() - { - self.peer_stats.rpc_stats.first_consecutive_answer_time = Some(recv_ts); - } self.record_latency(recv_ts - send_ts); self.touch_last_seen(recv_ts); + self.peer_stats.rpc_stats.recent_lost_answers = 0; } - pub(super) fn question_lost(&mut self, _ts: u64) { - self.peer_stats.rpc_stats.first_consecutive_answer_time = None; + pub(super) fn question_lost(&mut self) { + self.peer_stats.rpc_stats.first_consecutive_seen_ts = None; self.peer_stats.rpc_stats.questions_in_flight -= 1; + self.peer_stats.rpc_stats.recent_lost_answers += 1; + } + pub(super) fn failed_to_send(&mut self, ts: u64, expects_answer: bool) { + if expects_answer { + self.peer_stats.rpc_stats.last_question = Some(ts); + } + self.peer_stats.rpc_stats.failed_to_send += 1; + self.peer_stats.rpc_stats.first_consecutive_seen_ts = None; } } impl Drop for BucketEntry { fn drop(&mut self) { - assert_eq!(self.ref_count, 0); + if self.ref_count != 0 { + panic!( + "bucket entry dropped with non-zero refcount: {:#?}", + self.node_info() + ) + } } } diff --git a/veilid-core/src/routing_table/debug.rs b/veilid-core/src/routing_table/debug.rs index 64c30260..e0d8a019 100644 --- a/veilid-core/src/routing_table/debug.rs +++ b/veilid-core/src/routing_table/debug.rs @@ -75,25 +75,30 @@ impl RoutingTable { let mut cnt = 0; out += &format!("Entries: {}\n", inner.bucket_entry_count); while b < blen { - if inner.buckets[b].entries().len() > 0 { - out += &format!(" Bucket #{}:\n", b); - for e in inner.buckets[b].entries() { + let filtered_entries: Vec<(&DHTKey, &BucketEntry)> = inner.buckets[b] + .entries() + .filter(|e| { let state = e.1.state(cur_ts); - if state >= min_state { - out += &format!( - " {} [{}]\n", - e.0.encode(), - match state { - BucketEntryState::Reliable => "R", - BucketEntryState::Unreliable => "U", - BucketEntryState::Dead => "D", - } - ); - - cnt += 1; - if cnt >= limit { - break; + state >= min_state + }) + .collect(); + if !filtered_entries.is_empty() { + out += &format!(" Bucket #{}:\n", b); + for e in filtered_entries { + let state = e.1.state(cur_ts); + out += &format!( + " {} [{}]\n", + e.0.encode(), + match state { + BucketEntryState::Reliable => "R", + BucketEntryState::Unreliable => "U", + BucketEntryState::Dead => "D", } + ); + + cnt += 1; + if cnt >= limit { + break; } } if cnt >= limit { diff --git a/veilid-core/src/routing_table/find_nodes.rs b/veilid-core/src/routing_table/find_nodes.rs index afd520f1..e2f01d2e 100644 --- a/veilid-core/src/routing_table/find_nodes.rs +++ b/veilid-core/src/routing_table/find_nodes.rs @@ -72,10 +72,13 @@ impl RoutingTable { } } - pub fn filter_has_valid_signed_node_info(kv: &(&DHTKey, Option<&mut BucketEntry>)) -> bool { + pub fn filter_has_valid_signed_node_info( + kv: &(&DHTKey, Option<&mut BucketEntry>), + own_peer_info_is_valid: bool, + ) -> bool { match &kv.1 { - None => true, - Some(b) => b.has_node_info(), + None => own_peer_info_is_valid, + Some(b) => b.has_valid_signed_node_info(), } } @@ -117,10 +120,11 @@ impl RoutingTable { nodes.push(selfkv); } // add all nodes from buckets + // Can't use with_entries() here due to lifetime issues for b in &mut inner.buckets { for (k, v) in b.entries_mut() { // Don't bother with dead nodes - if !v.check_dead(cur_ts) { + if v.state(cur_ts) >= BucketEntryState::Unreliable { // Apply filter let kv = (k, Some(v)); if filter(&kv) { @@ -159,13 +163,11 @@ impl RoutingTable { // filter |kv| { if kv.1.is_none() { - // filter out self peer, as it is irrelevant to the 'fastest nodes' search - return false; + // always filter out self peer, as it is irrelevant to the 'fastest nodes' search + false + } else { + filter.as_ref().map(|f| f(kv)).unwrap_or(true) } - if filter.is_some() && !filter.as_ref().unwrap()(kv) { - return false; - } - true }, // sort |(a_key, a_entry), (b_key, b_entry)| { @@ -237,16 +239,7 @@ impl RoutingTable { node_count, cur_ts, // filter - |kv| { - if kv.1.is_none() { - // include self peer, as it is relevant to the 'closest nodes' search - return true; - } - if filter.is_some() && !filter.as_ref().unwrap()(kv) { - return false; - } - true - }, + |kv| filter.as_ref().map(|f| f(kv)).unwrap_or(true), // sort |(a_key, a_entry), (b_key, b_entry)| { // same nodes are always the same diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index c04978da..273bef60 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -311,11 +311,11 @@ impl RoutingTable { // Public dial info changed, go through all nodes and reset their 'seen our node info' bit if matches!(domain, RoutingDomain::PublicInternet) { - for bucket in &mut inner.buckets { - for entry in bucket.entries_mut() { - entry.1.set_seen_our_node_info(false); - } - } + let cur_ts = intf::get_timestamp(); + Self::with_entries(&mut *inner, cur_ts, BucketEntryState::Dead, |_, e| { + e.set_seen_our_node_info(false); + Option::<()>::None + }); } Ok(()) @@ -393,26 +393,18 @@ impl RoutingTable { let mut inner = this.inner.lock(); let mut node_refs = Vec::::with_capacity(inner.bucket_entry_count); let cur_ts = intf::get_timestamp(); - for bucket in &mut inner.buckets { - for entry in bucket.entries_mut() { - match entry.1.state(cur_ts) { - BucketEntryState::Reliable | BucketEntryState::Unreliable => { - // Only update nodes that haven't seen our node info yet - if !entry.1.has_seen_our_node_info() { - node_refs.push(NodeRef::new( - this.clone(), - *entry.0, - entry.1, - None, - )); - } - } - BucketEntryState::Dead => { - // do nothing - } - } + Self::with_entries(&mut *inner, cur_ts, BucketEntryState::Unreliable, |k, e| { + // Only update nodes that haven't seen our node info yet + if !e.has_seen_our_node_info() { + node_refs.push(NodeRef::new( + this.clone(), + *k, + e, + None, + )); } - } + Option::<()>::None + }); node_refs }; @@ -458,8 +450,8 @@ impl RoutingTable { for bucket in &mut inner.buckets { bucket.kick(0); } - log_rtab!( - "Routing table purge complete. Routing table now has {} nodes", + log_rtab!(debug + "Routing table purge complete. Routing table now has {} nodes", inner.bucket_entry_count ); } @@ -473,7 +465,7 @@ impl RoutingTable { if let Some(dead_node_ids) = bucket.kick(bucket_depth) { // Remove counts inner.bucket_entry_count -= dead_node_ids.len(); - log_rtab!("Routing table now has {} nodes", inner.bucket_entry_count); + log_rtab!(debug "Routing table now has {} nodes", inner.bucket_entry_count); // Now purge the routing table inner vectors //let filter = |k: &DHTKey| dead_node_ids.contains(k); @@ -490,6 +482,34 @@ impl RoutingTable { .unwrap() } + fn get_entry_count(inner: &mut RoutingTableInner, min_state: BucketEntryState) -> usize { + let mut count = 0usize; + let cur_ts = intf::get_timestamp(); + Self::with_entries(inner, cur_ts, min_state, |_, _| { + count += 1; + Option::<()>::None + }); + count + } + + fn with_entries Option>( + inner: &mut RoutingTableInner, + cur_ts: u64, + min_state: BucketEntryState, + mut f: F, + ) -> Option { + for bucket in &mut inner.buckets { + for entry in bucket.entries_mut() { + if entry.1.state(cur_ts) >= min_state { + if let Some(out) = f(entry.0, entry.1) { + return Some(out); + } + } + } + } + None + } + fn drop_node_ref(&self, node_id: DHTKey) { // Reduce ref count on entry let mut inner = self.inner.lock(); @@ -536,7 +556,8 @@ impl RoutingTable { None => { // Make new entry inner.bucket_entry_count += 1; - log_rtab!("Routing table now has {} nodes", inner.bucket_entry_count); + let cnt = inner.bucket_entry_count; + log_rtab!(debug "Routing table now has {} nodes, {} live", cnt, Self::get_entry_count(&mut *inner, BucketEntryState::Unreliable)); let bucket = &mut inner.buckets[idx]; let nr = bucket.add_entry(node_id); @@ -639,38 +660,32 @@ impl RoutingTable { let mut best_inbound_relay: Option = None; // Iterate all known nodes for candidates - for b in &mut inner.buckets { - for (k, entry) in b.entries_mut() { - // Ensure it's not dead - if !matches!(entry.state(cur_ts), BucketEntryState::Dead) { - // Ensure this node is not on our local network - if !entry - .local_node_info() - .map(|l| l.has_dial_info()) - .unwrap_or(false) - { - // Ensure we have the node's status - if let Some(node_status) = &entry.peer_stats().status { - // Ensure the node will relay - if node_status.will_relay { - if let Some(best_inbound_relay) = best_inbound_relay.as_mut() { - if best_inbound_relay.operate(|best| { - BucketEntry::cmp_fastest_reliable(cur_ts, best, entry) - }) == std::cmp::Ordering::Greater - { - *best_inbound_relay = - NodeRef::new(self.clone(), *k, entry, None); - } - } else { - best_inbound_relay = - Some(NodeRef::new(self.clone(), *k, entry, None)); - } + Self::with_entries(&mut *inner, cur_ts, BucketEntryState::Unreliable, |k, e| { + // Ensure this node is not on our local network + if !e + .local_node_info() + .map(|l| l.has_dial_info()) + .unwrap_or(false) + { + // Ensure we have the node's status + if let Some(node_status) = &e.peer_stats().status { + // Ensure the node will relay + if node_status.will_relay { + if let Some(best_inbound_relay) = best_inbound_relay.as_mut() { + if best_inbound_relay + .operate(|best| BucketEntry::cmp_fastest_reliable(cur_ts, best, e)) + == std::cmp::Ordering::Greater + { + *best_inbound_relay = NodeRef::new(self.clone(), *k, e, None); } + } else { + best_inbound_relay = Some(NodeRef::new(self.clone(), *k, e, None)); } } } } - } + Option::<()>::None + }); best_inbound_relay } @@ -771,11 +786,105 @@ impl RoutingTable { } } - async fn resolve_bootstrap(&self, bootstrap: Vec) -> Result, String> { - let mut out = Vec::::new(); + // Bootstrap lookup process + async fn resolve_bootstrap(&self, bootstrap: Vec) -> Result, String> { + let mut out = Vec::::new(); + + // Resolve from bootstrap root to bootstrap hostnames + let mut bsnames = Vec::::new(); for bh in bootstrap { - // + // Get TXT record for bootstrap (bootstrap.veilid.net, or similar) + let records = intf::txt_lookup(&bh).await?; + for record in records { + // Split the bootstrap name record by commas + for rec in record.split(',') { + let rec = rec.trim(); + // If the name specified is fully qualified, go with it + let bsname = if rec.ends_with('.') { + rec.to_string() + } + // If the name is not fully qualified, prepend it to the bootstrap name + else { + format!("{}.{}", rec, bh) + }; + + // Add to the list of bootstrap name to look up + bsnames.push(bsname); + } + } } + + // Get bootstrap nodes from hostnames concurrently + let mut unord = FuturesUnordered::new(); + for bsname in bsnames { + unord.push(async move { + // look up boostrap node txt records + let bsnirecords = match intf::txt_lookup(&bsname).await { + Err(e) => { + warn!("bootstrap node txt lookup failed for {}: {}", bsname, e); + return None; + } + Ok(v) => v, + }; + // for each record resolve into node dial info strings + let mut nodedialinfos: Vec = Vec::new(); + for bsnirecord in bsnirecords { + // split bootstrap node record by commas. example: + // 7lxDEabK_qgjbe38RtBa3IZLrud84P6NhGP-pRTZzdQ,tcp://bootstrap-dev-alpha.veilid.net:5150,udp://bootstrap-dev-alpha.veilid.net:5150,ws://bootstrap-dev-alpha.veilid.net:5150/ws + let mut records = bsnirecord.split(',').map(|x| x.trim()); + let node_id_str = match records.next() { + Some(v) => v, + None => { + warn!("no node id specified in bootstrap node txt record"); + continue; + } + }; + // Decode the node id + let node_id_key = match DHTKey::try_decode(node_id_str) { + Ok(v) => v, + Err(e) => { + warn!( + "Invalid node id in bootstrap node record {}: {}", + node_id_str, e + ); + continue; + } + }; + + // If this is our own node id, then we skip it for bootstrap, in case we are a bootstrap node + if self.node_id() == node_id_key { + continue; + } + + // Resolve each record and store in node dial infos list + let node_id = NodeId::new(node_id_key); + for rec in records { + let rec = rec.trim(); + let dial_infos = match DialInfo::try_vec_from_url(rec) { + Ok(dis) => dis, + Err(e) => { + warn!("Couldn't resolve bootstrap node dial info {}: {}", rec, e); + continue; + } + }; + + for dial_info in dial_infos { + nodedialinfos.push(NodeDialInfo { + node_id: node_id.clone(), + dial_info, + }) + } + } + } + Some(nodedialinfos) + }); + } + while let Some(ndis) = unord.next().await { + if let Some(mut ndis) = ndis { + out.append(&mut ndis); + } + } + Ok(out) } @@ -791,8 +900,18 @@ impl RoutingTable { log_rtab!("--- bootstrap_task"); // If we aren't specifying a bootstrap node list explicitly, then pull from the bootstrap server(s) - let bootstrap_nodes = if !bootstrap_nodes.is_empty() { - bootstrap_nodes + let bootstrap_node_dial_infos = if !bootstrap_nodes.is_empty() { + let mut bsnvec = Vec::new(); + for b in bootstrap_nodes { + let ndis = NodeDialInfo::from_str(b.as_str()) + .map_err(map_to_string) + .map_err(logthru_rtab!( + "Invalid node dial info in bootstrap entry: {}", + b + ))?; + bsnvec.push(ndis); + } + bsnvec } else { // Resolve bootstrap servers and recurse their TXT entries self.resolve_bootstrap(bootstrap).await? @@ -800,16 +919,13 @@ impl RoutingTable { // Map all bootstrap entries to a single key with multiple dialinfo let mut bsmap: BTreeMap> = BTreeMap::new(); - for b in bootstrap_nodes { - let ndis = NodeDialInfo::from_str(b.as_str()) - .map_err(map_to_string) - .map_err(logthru_rtab!("Invalid dial info in bootstrap entry: {}", b))?; - let node_id = ndis.node_id.key; + for ndi in bootstrap_node_dial_infos { + let node_id = ndi.node_id.key; bsmap .entry(node_id) .or_insert_with(Vec::new) .push(DialInfoDetail { - dial_info: ndis.dial_info, + dial_info: ndi.dial_info, class: DialInfoClass::Direct, // Bootstraps are always directly reachable }); } @@ -846,13 +962,15 @@ impl RoutingTable { "bootstrap at {:?} did not return valid signed node info", nr ); - // xxx: delete the node? + // If this node info is invalid, it will time out after being unpingable } else { // otherwise this bootstrap is valid, lets ask it to find ourselves now this.reverse_find_node(nr, true).await } }); } + + // Wait for all bootstrap operations to complete before we complete the singlefuture while unord.next().await.is_some() {} Ok(()) } @@ -865,15 +983,20 @@ impl RoutingTable { async fn peer_minimum_refresh_task_routine(self) -> Result<(), String> { log_rtab!("--- peer_minimum_refresh task"); - // get list of all peers we know about, even the unreliable ones, and ask them to bootstrap too + // get list of all peers we know about, even the unreliable ones, and ask them to find nodes close to our node too let noderefs = { let mut inner = self.inner.lock(); let mut noderefs = Vec::::with_capacity(inner.bucket_entry_count); - for b in &mut inner.buckets { - for (k, entry) in b.entries_mut() { - noderefs.push(NodeRef::new(self.clone(), *k, entry, None)) - } - } + let cur_ts = intf::get_timestamp(); + Self::with_entries( + &mut *inner, + cur_ts, + BucketEntryState::Unreliable, + |k, entry| { + noderefs.push(NodeRef::new(self.clone(), *k, entry, None)); + Option::<()>::None + }, + ); noderefs }; log_rtab!(" refreshing with nodes: {:?}", noderefs); @@ -892,32 +1015,31 @@ impl RoutingTable { // Ping each node in the routing table if they need to be pinged // to determine their reliability async fn ping_validator_task_routine(self, _last_ts: u64, cur_ts: u64) -> Result<(), String> { - log_rtab!("--- ping_validator task"); + // log_rtab!("--- ping_validator task"); let rpc = self.rpc_processor(); let netman = self.network_manager(); let relay_node_id = netman.relay_node().map(|nr| nr.node_id()); let mut inner = self.inner.lock(); - for b in &mut inner.buckets { - for (k, entry) in b.entries_mut() { - if entry.needs_ping(k, cur_ts, relay_node_id) { - let nr = NodeRef::new(self.clone(), *k, entry, None); - log_rtab!( - " --- ping validating: {:?} ({})", - nr, - entry.state_debug_info(cur_ts) - ); - intf::spawn_local(rpc.clone().rpc_call_status(nr)).detach(); - } + Self::with_entries(&mut *inner, cur_ts, BucketEntryState::Unreliable, |k, e| { + if e.needs_ping(k, cur_ts, relay_node_id) { + let nr = NodeRef::new(self.clone(), *k, e, None); + log_rtab!( + " --- ping validating: {:?} ({})", + nr, + e.state_debug_info(cur_ts) + ); + intf::spawn_local(rpc.clone().rpc_call_status(nr)).detach(); } - } + Option::<()>::None + }); Ok(()) } // Compute transfer statistics to determine how 'fast' a node is async fn rolling_transfers_task_routine(self, last_ts: u64, cur_ts: u64) -> Result<(), String> { - log_rtab!("--- rolling_transfers task"); + // log_rtab!("--- rolling_transfers task"); let inner = &mut *self.inner.lock(); // Roll our own node's transfers @@ -940,8 +1062,8 @@ impl RoutingTable { // Do rolling transfers every ROLLING_TRANSFERS_INTERVAL_SECS secs self.unlocked_inner.rolling_transfers_task.tick().await?; - // If routing table is empty, then add the bootstrap nodes to it - if self.inner.lock().bucket_entry_count == 0 { + // If routing table has no live entries, then add the bootstrap nodes to it + if Self::get_entry_count(&mut *self.inner.lock(), BucketEntryState::Unreliable) == 0 { self.unlocked_inner.bootstrap_task.tick().await?; } @@ -950,7 +1072,9 @@ impl RoutingTable { let c = self.config.get(); c.network.dht.min_peer_count as usize }; - if self.inner.lock().bucket_entry_count < min_peer_count { + if Self::get_entry_count(&mut *self.inner.lock(), BucketEntryState::Unreliable) + < min_peer_count + { self.unlocked_inner.peer_minimum_refresh_task.tick().await?; } // Ping validate some nodes to groom the table @@ -987,13 +1111,13 @@ impl RoutingTable { e.question_rcvd(ts, bytes); }) } - pub fn stats_answer_sent(&self, node_ref: NodeRef, ts: u64, bytes: u64) { + pub fn stats_answer_sent(&self, node_ref: NodeRef, bytes: u64) { self.inner .lock() .self_transfer_stats_accounting .add_up(bytes); node_ref.operate(|e| { - e.answer_sent(ts, bytes); + e.answer_sent(bytes); }) } pub fn stats_answer_rcvd(&self, node_ref: NodeRef, send_ts: u64, recv_ts: u64, bytes: u64) { @@ -1009,9 +1133,14 @@ impl RoutingTable { e.answer_rcvd(send_ts, recv_ts, bytes); }) } - pub fn stats_question_lost(&self, node_ref: NodeRef, ts: u64) { + pub fn stats_question_lost(&self, node_ref: NodeRef) { node_ref.operate(|e| { - e.question_lost(ts); + e.question_lost(); + }) + } + pub fn stats_failed_to_send(&self, node_ref: NodeRef, ts: u64, expects_answer: bool) { + node_ref.operate(|e| { + e.failed_to_send(ts, expects_answer); }) } diff --git a/veilid-core/src/routing_table/node_ref.rs b/veilid-core/src/routing_table/node_ref.rs index 593d63f3..d7f336fa 100644 --- a/veilid-core/src/routing_table/node_ref.rs +++ b/veilid-core/src/routing_table/node_ref.rs @@ -214,8 +214,8 @@ impl NodeRef { // Get the last connection and the last time we saw anything with this connection let (last_connection, last_seen) = self.operate(|e| { if let Some((last_connection, connection_ts)) = e.last_connection() { - if let Some(last_seen) = e.peer_stats().last_seen { - Some((last_connection, u64::max(last_seen, connection_ts))) + if let Some(last_seen_ts) = e.peer_stats().rpc_stats.last_seen_ts { + Some((last_connection, u64::max(last_seen_ts, connection_ts))) } else { Some((last_connection, connection_ts)) } diff --git a/veilid-core/src/rpc_processor/debug.rs b/veilid-core/src/rpc_processor/debug.rs index 6ef8505f..708226c3 100644 --- a/veilid-core/src/rpc_processor/debug.rs +++ b/veilid-core/src/rpc_processor/debug.rs @@ -88,6 +88,7 @@ macro_rules! map_error_panic { } impl RPCProcessor { + #[allow(dead_code)] pub(super) fn get_rpc_request_debug_info( &self, dest: &Destination, @@ -104,6 +105,7 @@ impl RPCProcessor { self.get_rpc_message_debug_info(message) ) } + #[allow(dead_code)] pub(super) fn get_rpc_reply_debug_info( &self, request_rpcreader: &RPCMessageReader, diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index affbd947..15e21f50 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -348,16 +348,12 @@ impl RPCProcessor { .map_err(map_error_internal!("invalid timeout"))?; // wait for eventualvalue let start_ts = get_timestamp(); - timeout(timeout_ms, waitable_reply.eventual.instance()) + let res = timeout(timeout_ms, waitable_reply.eventual.instance()) .await .map_err(|_| RPCError::Timeout)?; - match waitable_reply.eventual.take_value() { - None => panic!("there should be a reply value but there wasn't"), - Some(rpcreader) => { - let end_ts = get_timestamp(); - Ok((rpcreader, end_ts - start_ts)) - } - } + let rpcreader = res.take_value().unwrap(); + let end_ts = get_timestamp(); + Ok((rpcreader, end_ts - start_ts)) } async fn wait_for_reply( &self, @@ -369,7 +365,7 @@ impl RPCProcessor { self.cancel_op_id_waiter(waitable_reply.op_id); self.routing_table() - .stats_question_lost(waitable_reply.node_ref.clone(), waitable_reply.send_ts); + .stats_question_lost(waitable_reply.node_ref.clone()); } Ok((rpcreader, _)) => { // Note that we definitely received this node info since we got a reply @@ -396,7 +392,7 @@ impl RPCProcessor { message: capnp::message::Reader, safety_route_spec: Option<&SafetyRouteSpec>, ) -> Result, RPCError> { - log_rpc!(self.get_rpc_request_debug_info(&dest, &message, &safety_route_spec)); + //log_rpc!(self.get_rpc_request_debug_info(&dest, &message, &safety_route_spec)); let (op_id, wants_answer) = { let operation = message @@ -539,6 +535,7 @@ impl RPCProcessor { // send question let bytes = out.len() as u64; + let send_ts = get_timestamp(); let send_data_kind = match self .network_manager() .send_envelope(node_ref.clone(), Some(out_node_id), out) @@ -552,12 +549,15 @@ impl RPCProcessor { if eventual.is_some() { self.cancel_op_id_waiter(op_id); } + + self.routing_table() + .stats_failed_to_send(node_ref, send_ts, wants_answer); + return Err(e); } }; // Successfully sent - let send_ts = get_timestamp(); self.routing_table() .stats_question_sent(node_ref.clone(), send_ts, bytes, wants_answer); @@ -586,7 +586,7 @@ impl RPCProcessor { reply_msg: capnp::message::Reader, safety_route_spec: Option<&SafetyRouteSpec>, ) -> Result<(), RPCError> { - log_rpc!(self.get_rpc_reply_debug_info(&request_rpcreader, &reply_msg, &safety_route_spec)); + // log_rpc!(self.get_rpc_reply_debug_info(&request_rpcreader, &reply_msg, &safety_route_spec)); // let out_node_id; @@ -721,16 +721,19 @@ impl RPCProcessor { // Send the reply let bytes = out.len() as u64; + let send_ts = get_timestamp(); self.network_manager() .send_envelope(node_ref.clone(), Some(out_node_id), out) .await - .map_err(RPCError::Internal)?; + .map_err(RPCError::Internal) + .map_err(|e| { + self.routing_table() + .stats_failed_to_send(node_ref.clone(), send_ts, false); + e + })?; // Reply successfully sent - let send_ts = get_timestamp(); - - self.routing_table() - .stats_answer_sent(node_ref, send_ts, bytes); + self.routing_table().stats_answer_sent(node_ref, bytes); Ok(()) } @@ -982,10 +985,14 @@ impl RPCProcessor { // find N nodes closest to the target node in our routing table let own_peer_info = routing_table.get_own_peer_info(); + let own_peer_info_is_valid = own_peer_info.signed_node_info.is_valid(); + let closest_nodes = routing_table.find_closest_nodes( target_node_id, // filter - Some(Box::new(RoutingTable::filter_has_valid_signed_node_info)), + Some(Box::new(move |kv| { + RoutingTable::filter_has_valid_signed_node_info(kv, own_peer_info_is_valid) + })), // transform |e| RoutingTable::transform_to_peer_info(e, &own_peer_info), ); @@ -1569,7 +1576,7 @@ impl RPCProcessor { .await?; // Wait for receipt - match eventual_value.await { + match eventual_value.await.take_value().unwrap() { ReceiptEvent::Returned(_) => Ok(true), ReceiptEvent::Expired => Ok(false), ReceiptEvent::Cancelled => { diff --git a/veilid-core/src/veilid_api/mod.rs b/veilid-core/src/veilid_api/mod.rs index 40f2769f..1efa902a 100644 --- a/veilid-core/src/veilid_api/mod.rs +++ b/veilid-core/src/veilid_api/mod.rs @@ -342,6 +342,9 @@ pub struct NodeInfo { } impl NodeInfo { + pub fn is_valid(&self) -> bool { + !matches!(self.network_class, NetworkClass::Invalid) + } pub fn first_filtered_dial_info_detail(&self, filter: F) -> Option where F: Fn(&DialInfoDetail) -> bool, @@ -1036,8 +1039,9 @@ impl DialInfo { } } - pub fn try_vec_from_url(url: String) -> Result, VeilidAPIError> { - let split_url = SplitUrl::from_str(&url) + pub fn try_vec_from_url>(url: S) -> Result, VeilidAPIError> { + let url = url.as_ref(); + let split_url = SplitUrl::from_str(url) .map_err(|e| parse_error!(format!("unable to split url: {}", e), url))?; let port = match split_url.scheme.as_str() { @@ -1070,11 +1074,11 @@ impl DialInfo { "tcp" => Self::tcp_from_socketaddr(sa), "ws" => Self::try_ws( SocketAddress::from_socket_addr(sa).to_canonical(), - url.clone(), + url.to_string(), )?, "wss" => Self::try_wss( SocketAddress::from_socket_addr(sa).to_canonical(), - url.clone(), + url.to_string(), )?, _ => { unreachable!("Invalid dial info url scheme") @@ -1202,6 +1206,10 @@ impl SignedNodeInfo { timestamp: intf::get_timestamp(), } } + + pub fn is_valid(&self) -> bool { + self.signature.valid && self.node_info.is_valid() + } } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -1281,7 +1289,7 @@ impl MatchesDialInfoFilter for ConnectionDescriptor { if !self.matches_peer_scope(filter.peer_scope) { return false; } - if filter.protocol_set.contains(self.protocol_type()) { + if !filter.protocol_set.contains(self.protocol_type()) { return false; } if let Some(at) = filter.address_type { @@ -1356,19 +1364,20 @@ pub struct RPCStats { pub messages_sent: u32, // number of rpcs that have been sent in the total_time range pub messages_rcvd: u32, // number of rpcs that have been received in the total_time range pub questions_in_flight: u32, // number of questions issued that have yet to be answered - //pub last_question: Option, // when the peer was last questioned and we want an answer - pub first_consecutive_answer_time: Option, // the timestamp of the first answer in a series of consecutive questions + pub last_question: Option, // when the peer was last questioned (either successfully or not) and we wanted an answer + pub last_seen_ts: Option, // when the peer was last seen for any reason, including when we first attempted to reach out to it + pub first_consecutive_seen_ts: Option, // the timestamp of the first consecutive proof-of-life for this node (an answer or received question) pub recent_lost_answers: u32, // number of answers that have been lost since we lost reliability + pub failed_to_send: u32, // number of messages that have failed to send since we last successfully sent one } #[derive(Clone, Debug, Default, Serialize, Deserialize)] pub struct PeerStats { pub time_added: u64, // when the peer was added to the routing table - pub last_seen: Option, // when the peer was last seen for any reason, including when we first attempted to reach out to it - pub rpc_stats: RPCStats, // information about RPCs + pub rpc_stats: RPCStats, // information about RPCs pub latency: Option, // latencies for communications with the peer pub transfer: TransferStatsDownUp, // Stats for communications with the peer - pub status: Option, // Last known node status + pub status: Option, // Last known node status } cfg_if! { diff --git a/veilid-core/src/veilid_config.rs b/veilid-core/src/veilid_config.rs index ea3d0c71..d6c5d4f4 100644 --- a/veilid-core/src/veilid_config.rs +++ b/veilid-core/src/veilid_config.rs @@ -296,6 +296,8 @@ impl VeilidConfig { get_config!(inner.network.max_connections_per_ip6_prefix_size); get_config!(inner.network.max_connection_frequency_per_min); get_config!(inner.network.client_whitelist_timeout_ms); + get_config!(inner.network.reverse_connection_receipt_time_ms); + get_config!(inner.network.hole_punch_receipt_time_ms); get_config!(inner.network.bootstrap); get_config!(inner.network.bootstrap_nodes); get_config!(inner.network.routing_table.limit_over_attached); diff --git a/veilid-core/src/xx/async_peek_stream.rs b/veilid-core/src/xx/async_peek_stream.rs index 975cf162..b7797b3c 100644 --- a/veilid-core/src/xx/async_peek_stream.rs +++ b/veilid-core/src/xx/async_peek_stream.rs @@ -20,6 +20,96 @@ where } } +//////// +/// + +pub struct Peek<'a> { + aps: AsyncPeekStream, + buf: &'a mut [u8], +} + +impl Unpin for Peek<'_> {} + +impl Future for Peek<'_> { + type Output = std::io::Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = &mut *self; + + let mut inner = this.aps.inner.lock(); + let inner = &mut *inner; + // + let buf_len = this.buf.len(); + let mut copy_len = buf_len; + if buf_len > inner.peekbuf_len { + // + inner.peekbuf.resize(buf_len, 0u8); + let mut read_future = inner + .stream + .read(&mut inner.peekbuf.as_mut_slice()[inner.peekbuf_len..buf_len]); + let read_len = match Pin::new(&mut read_future).poll(cx) { + Poll::Pending => { + inner.peekbuf.resize(inner.peekbuf_len, 0u8); + return Poll::Pending; + } + Poll::Ready(Err(e)) => { + return Poll::Ready(Err(e)); + } + Poll::Ready(Ok(v)) => v, + }; + inner.peekbuf_len += read_len; + inner.peekbuf.resize(inner.peekbuf_len, 0u8); + copy_len = inner.peekbuf_len; + } + this.buf[..copy_len].copy_from_slice(&inner.peekbuf[..copy_len]); + Poll::Ready(Ok(copy_len)) + } +} + +//////// +/// + +pub struct PeekExact<'a> { + aps: AsyncPeekStream, + buf: &'a mut [u8], +} + +impl Unpin for PeekExact<'_> {} + +impl Future for PeekExact<'_> { + type Output = std::io::Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = &mut *self; + + let mut inner = this.aps.inner.lock(); + let inner = &mut *inner; + // + let buf_len = this.buf.len(); + let mut copy_len = buf_len; + if buf_len > inner.peekbuf_len { + // + inner.peekbuf.resize(buf_len, 0u8); + let mut read_future = inner + .stream + .read_exact(&mut inner.peekbuf.as_mut_slice()[inner.peekbuf_len..buf_len]); + match Pin::new(&mut read_future).poll(cx) { + Poll::Pending => { + inner.peekbuf.resize(inner.peekbuf_len, 0u8); + return Poll::Pending; + } + Poll::Ready(Err(e)) => { + return Poll::Ready(Err(e)); + } + Poll::Ready(Ok(())) => (), + }; + inner.peekbuf_len = buf_len; + copy_len = inner.peekbuf_len; + } + this.buf[..copy_len].copy_from_slice(&inner.peekbuf[..copy_len]); + Poll::Ready(Ok(copy_len)) + } +} ///////// /// struct AsyncPeekStreamInner { @@ -50,60 +140,18 @@ impl AsyncPeekStream { } } - pub async fn peek(&'_ self, buf: &'_ mut [u8]) -> Result { - let (mut stream, mut peekbuf, mut peekbuf_len) = { - let inner = self.inner.lock(); - ( - inner.stream.clone_stream(), - inner.peekbuf.clone(), - inner.peekbuf_len, - ) - }; - // - let buf_len = buf.len(); - let mut copy_len = buf_len; - if buf_len > peekbuf_len { - // - peekbuf.resize(buf_len, 0u8); - let read_len = stream - .read(&mut peekbuf.as_mut_slice()[peekbuf_len..buf_len]) - .await?; - peekbuf_len += read_len; - copy_len = peekbuf_len; + pub fn peek<'a>(&'a self, buf: &'a mut [u8]) -> Peek<'a> { + Peek::<'a> { + aps: self.clone(), + buf, } - buf[..copy_len].copy_from_slice(&peekbuf[..copy_len]); - - let mut inner = self.inner.lock(); - inner.peekbuf = peekbuf; - inner.peekbuf_len = peekbuf_len; - Ok(copy_len) } - pub async fn peek_exact(&'_ self, buf: &'_ mut [u8]) -> Result<()> { - let (mut stream, mut peekbuf, mut peekbuf_len) = { - let inner = self.inner.lock(); - ( - inner.stream.clone_stream(), - inner.peekbuf.clone(), - inner.peekbuf_len, - ) - }; - // - let buf_len = buf.len(); - if buf_len > peekbuf_len { - // - peekbuf.resize(buf_len, 0u8); - stream - .read_exact(&mut peekbuf.as_mut_slice()[peekbuf_len..buf_len]) - .await?; - peekbuf_len = buf_len; + pub fn peek_exact<'a>(&'a self, buf: &'a mut [u8]) -> PeekExact<'a> { + PeekExact::<'a> { + aps: self.clone(), + buf, } - buf.copy_from_slice(&peekbuf[..buf_len]); - - let mut inner = self.inner.lock(); - inner.peekbuf = peekbuf; - inner.peekbuf_len = peekbuf_len; - Ok(()) } } diff --git a/veilid-core/src/xx/eventual_value.rs b/veilid-core/src/xx/eventual_value.rs index 3c65f0b5..ab0825aa 100644 --- a/veilid-core/src/xx/eventual_value.rs +++ b/veilid-core/src/xx/eventual_value.rs @@ -62,7 +62,7 @@ pub struct EventualValueFuture { } impl Future for EventualValueFuture { - type Output = (); + type Output = EventualValue; fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll { let this = &mut *self; let out = { @@ -76,7 +76,7 @@ impl Future for EventualValueFuture { for w in wakers { w.wake(); } - task::Poll::::Ready(()) + task::Poll::::Ready(this.eventual.clone()) } } } diff --git a/veilid-core/src/xx/log_thru.rs b/veilid-core/src/xx/log_thru.rs index 8beb05ee..166c966e 100644 --- a/veilid-core/src/xx/log_thru.rs +++ b/veilid-core/src/xx/log_thru.rs @@ -97,6 +97,14 @@ macro_rules! log_rtab { (warn $fmt:literal, $($arg:expr),+) => { warn!(target:"rtab", $fmt, $($arg),+); }; + (debug $text:expr) => { debug!( + target: "rtab", + "{}", + $text, + )}; + (debug $fmt:literal, $($arg:expr),+) => { + debug!(target:"rtab", $fmt, $($arg),+); + }; ($text:expr) => {trace!( target: "rtab", "{}", @@ -230,7 +238,7 @@ macro_rules! logthru { (error $target:literal) => (|e__| { error!( target: $target, - "[{}]", + "[{:?}]", e__, ); e__ @@ -238,7 +246,7 @@ macro_rules! logthru { (error $target:literal, $text:literal) => (|e__| { error!( target: $target, - "[{}] {}", + "[{:?}] {}", e__, $text ); @@ -247,7 +255,7 @@ macro_rules! logthru { (error $target:literal, $fmt:literal, $($arg:expr),+) => (|e__| { error!( target: $target, - concat!("[{}] ", $fmt), + concat!("[{:?}] ", $fmt), e__, $($arg),+ ); @@ -257,7 +265,7 @@ macro_rules! logthru { (warn $target:literal) => (|e__| { warn!( target: $target, - "[{}]", + "[{:?}]", e__, ); e__ @@ -265,7 +273,7 @@ macro_rules! logthru { (warn $target:literal, $text:literal) => (|e__| { warn!( target: $target, - "[{}] {}", + "[{:?}] {}", e__, $text ); @@ -274,7 +282,7 @@ macro_rules! logthru { (warn $target:literal, $fmt:literal, $($arg:expr),+) => (|e__| { warn!( target: $target, - concat!("[{}] ", $fmt), + concat!("[{:?}] ", $fmt), e__, $($arg),+ ); @@ -284,7 +292,7 @@ macro_rules! logthru { (debug $target:literal) => (|e__| { debug!( target: $target, - "[{}]", + "[{:?}]", e__, ); e__ @@ -292,7 +300,7 @@ macro_rules! logthru { (debug $target:literal, $text:literal) => (|e__| { debug!( target: $target, - "[{}] {}", + "[{:?}] {}", e__, $text ); @@ -301,7 +309,7 @@ macro_rules! logthru { (debug $target:literal, $fmt:literal, $($arg:expr),+) => (|e__| { debug!( target: $target, - concat!("[{}] ", $fmt), + concat!("[{:?}] ", $fmt), e__, $($arg),+ ); @@ -311,7 +319,7 @@ macro_rules! logthru { ($target:literal) => (|e__| { trace!( target: $target, - "[{}]", + "[{:?}]", e__, ); e__ @@ -319,7 +327,7 @@ macro_rules! logthru { ($target:literal, $text:literal) => (|e__| { trace!( target: $target, - "[{}] {}", + "[{:?}] {}", e__, $text ); @@ -328,7 +336,7 @@ macro_rules! logthru { ($target:literal, $fmt:literal, $($arg:expr),+) => (|e__| { trace!( target: $target, - concat!("[{}] ", $fmt), + concat!("[{:?}] ", $fmt), e__, $($arg),+ ); diff --git a/veilid-core/src/xx/single_shot_eventual.rs b/veilid-core/src/xx/single_shot_eventual.rs index bef2a41f..99c2a567 100644 --- a/veilid-core/src/xx/single_shot_eventual.rs +++ b/veilid-core/src/xx/single_shot_eventual.rs @@ -2,38 +2,43 @@ use super::*; pub struct SingleShotEventual where - T: Unpin + Clone, + T: Unpin, { - eventual: EventualValueClone, - drop_value: T, + eventual: EventualValue, + drop_value: Option, } impl Drop for SingleShotEventual where - T: Unpin + Clone, + T: Unpin, { fn drop(&mut self) { - self.eventual.resolve(self.drop_value.clone()); + if let Some(drop_value) = self.drop_value.take() { + self.eventual.resolve(drop_value); + } } } impl SingleShotEventual where - T: Unpin + Clone, + T: Unpin, { - pub fn new(drop_value: T) -> Self { + pub fn new(drop_value: Option) -> Self { Self { - eventual: EventualValueClone::new(), + eventual: EventualValue::new(), drop_value, } } // Can only call this once, it consumes the eventual - pub fn resolve(self, value: T) -> EventualResolvedFuture> { + pub fn resolve(mut self, value: T) -> EventualResolvedFuture> { + // If we resolve, we don't want to resolve again to the drop value + self.drop_value = None; + // Resolve to the specified value self.eventual.resolve(value) } - pub fn instance(&self) -> EventualValueCloneFuture { + pub fn instance(&self) -> EventualValueFuture { self.eventual.instance() } } diff --git a/veilid-flutter/example/lib/config.dart b/veilid-flutter/example/lib/config.dart index e1d3754c..a1e58a45 100644 --- a/veilid-flutter/example/lib/config.dart +++ b/veilid-flutter/example/lib/config.dart @@ -49,7 +49,7 @@ Future getDefaultVeilidConfig() async { holePunchReceiptTimeMs: 5000, nodeId: "", nodeIdSecret: "", - bootstrap: [], + bootstrap: ["bootstrap-dev.veilid.net"], bootstrapNodes: [], routingTable: VeilidConfigRoutingTable( limitOverAttached: 64, diff --git a/veilid-server/src/settings.rs b/veilid-server/src/settings.rs index 9bf21181..f1ab7349 100644 --- a/veilid-server/src/settings.rs +++ b/veilid-server/src/settings.rs @@ -63,7 +63,7 @@ core: hole_punch_receipt_time_ms: 5000 node_id: '' node_id_secret: '' - bootstrap: ['bootstrap.veilid.net'] + bootstrap: ['bootstrap-dev.veilid.net'] bootstrap_nodes: [] routing_table: limit_over_attached: 64