many fixes for bootstrap and public internet connectivity
This commit is contained in:
parent
9a54ee052c
commit
424ceedfd8
1
package/debian/veilid-server/DEBIAN/conffiles
Normal file
1
package/debian/veilid-server/DEBIAN/conffiles
Normal file
@ -0,0 +1 @@
|
|||||||
|
/etc/veilid-server/veilid-server.conf
|
@ -1,4 +1,5 @@
|
|||||||
#!/bin/sh
|
#!/bin/sh
|
||||||
|
|
||||||
set -e
|
set -e
|
||||||
|
|
||||||
if [ -d /run/systemd/system ]; then
|
if [ -d /run/systemd/system ]; then
|
||||||
|
@ -4,3 +4,4 @@ core:
|
|||||||
dht:
|
dht:
|
||||||
min_peer_count: 1
|
min_peer_count: 1
|
||||||
enable_local_peer_scope: true
|
enable_local_peer_scope: true
|
||||||
|
bootstrap: []
|
||||||
|
@ -97,6 +97,7 @@ impl ConnectionManager {
|
|||||||
inner: &mut ConnectionManagerInner,
|
inner: &mut ConnectionManagerInner,
|
||||||
conn: NetworkConnection,
|
conn: NetworkConnection,
|
||||||
) -> Result<(), String> {
|
) -> Result<(), String> {
|
||||||
|
log_net!("on_new_connection_internal: {:?}", conn);
|
||||||
let tx = inner
|
let tx = inner
|
||||||
.connection_add_channel_tx
|
.connection_add_channel_tx
|
||||||
.as_ref()
|
.as_ref()
|
||||||
|
@ -316,6 +316,11 @@ impl Network {
|
|||||||
&peer_socket_addr,
|
&peer_socket_addr,
|
||||||
&descriptor.local.map(|sa| sa.to_socket_addr()),
|
&descriptor.local.map(|sa| sa.to_socket_addr()),
|
||||||
) {
|
) {
|
||||||
|
log_net!(
|
||||||
|
"send_data_to_existing_connection connectionless to {:?}",
|
||||||
|
descriptor
|
||||||
|
);
|
||||||
|
|
||||||
ph.clone()
|
ph.clone()
|
||||||
.send_message(data, peer_socket_addr)
|
.send_message(data, peer_socket_addr)
|
||||||
.await
|
.await
|
||||||
@ -334,6 +339,8 @@ impl Network {
|
|||||||
|
|
||||||
// Try to send to the exact existing connection if one exists
|
// Try to send to the exact existing connection if one exists
|
||||||
if let Some(conn) = self.connection_manager().get_connection(descriptor).await {
|
if let Some(conn) = self.connection_manager().get_connection(descriptor).await {
|
||||||
|
log_net!("send_data_to_existing_connection to {:?}", descriptor);
|
||||||
|
|
||||||
// connection exists, send over it
|
// connection exists, send over it
|
||||||
conn.send(data).await.map_err(logthru_net!())?;
|
conn.send(data).await.map_err(logthru_net!())?;
|
||||||
|
|
||||||
|
@ -77,10 +77,15 @@ impl Network {
|
|||||||
protocol_handlers: &[Box<dyn ProtocolAcceptHandler>],
|
protocol_handlers: &[Box<dyn ProtocolAcceptHandler>],
|
||||||
) -> Result<Option<NetworkConnection>, String> {
|
) -> Result<Option<NetworkConnection>, String> {
|
||||||
for ah in protocol_handlers.iter() {
|
for ah in protocol_handlers.iter() {
|
||||||
if let Some(nc) = ah.on_accept(stream.clone(), addr).await? {
|
if let Some(nc) = ah
|
||||||
|
.on_accept(stream.clone(), addr)
|
||||||
|
.await
|
||||||
|
.map_err(logthru_net!())?
|
||||||
|
{
|
||||||
return Ok(Some(nc));
|
return Ok(Some(nc));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(None)
|
Ok(None)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -105,7 +110,7 @@ impl Network {
|
|||||||
let std_listener: std::net::TcpListener = socket.into();
|
let std_listener: std::net::TcpListener = socket.into();
|
||||||
let listener = TcpListener::from(std_listener);
|
let listener = TcpListener::from(std_listener);
|
||||||
|
|
||||||
trace!("spawn_socket_listener: binding successful to {}", addr);
|
debug!("spawn_socket_listener: binding successful to {}", addr);
|
||||||
|
|
||||||
// Create protocol handler records
|
// Create protocol handler records
|
||||||
let listener_state = Arc::new(RwLock::new(ListenerState::new()));
|
let listener_state = Arc::new(RwLock::new(ListenerState::new()));
|
||||||
@ -140,7 +145,7 @@ impl Network {
|
|||||||
};
|
};
|
||||||
// XXX limiting
|
// XXX limiting
|
||||||
|
|
||||||
trace!("TCP connection from: {}", addr);
|
log_net!("TCP connection from: {}", addr);
|
||||||
|
|
||||||
// Create a stream we can peek on
|
// Create a stream we can peek on
|
||||||
let ps = AsyncPeekStream::new(tcp_stream);
|
let ps = AsyncPeekStream::new(tcp_stream);
|
||||||
@ -166,6 +171,7 @@ impl Network {
|
|||||||
|
|
||||||
// Check is this could be TLS
|
// Check is this could be TLS
|
||||||
let ls = listener_state.read().clone();
|
let ls = listener_state.read().clone();
|
||||||
|
|
||||||
let conn = if ls.tls_acceptor.is_some() && first_packet[0] == 0x16 {
|
let conn = if ls.tls_acceptor.is_some() && first_packet[0] == 0x16 {
|
||||||
this.try_tls_handlers(
|
this.try_tls_handlers(
|
||||||
ls.tls_acceptor.as_ref().unwrap(),
|
ls.tls_acceptor.as_ref().unwrap(),
|
||||||
@ -178,28 +184,34 @@ impl Network {
|
|||||||
} else {
|
} else {
|
||||||
this.try_handlers(ps, addr, &ls.protocol_handlers).await
|
this.try_handlers(ps, addr, &ls.protocol_handlers).await
|
||||||
};
|
};
|
||||||
|
|
||||||
let conn = match conn {
|
let conn = match conn {
|
||||||
Ok(Some(c)) => c,
|
Ok(Some(c)) => {
|
||||||
|
log_net!("protocol handler found for {:?}: {:?}", addr, c);
|
||||||
|
c
|
||||||
|
}
|
||||||
Ok(None) => {
|
Ok(None) => {
|
||||||
// No protocol handlers matched? drop it.
|
// No protocol handlers matched? drop it.
|
||||||
|
log_net!(warn "no protocol handler for connection from {:?}", addr);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
Err(_) => {
|
Err(e) => {
|
||||||
// Failed to negotiate connection? drop it.
|
// Failed to negotiate connection? drop it.
|
||||||
|
log_net!(warn "failed to negotiate connection from {:?}: {}", addr, e);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// Register the new connection in the connection manager
|
// Register the new connection in the connection manager
|
||||||
if let Err(e) = connection_manager.on_new_connection(conn).await {
|
if let Err(e) = connection_manager.on_new_connection(conn).await {
|
||||||
error!("failed to register new connection: {}", e);
|
log_net!(error "failed to register new connection: {}", e);
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.await;
|
.await;
|
||||||
trace!("exited incoming loop for {}", addr);
|
log_net!(debug "exited incoming loop for {}", addr);
|
||||||
// Remove our listener state from this address if we're stopping
|
// Remove our listener state from this address if we're stopping
|
||||||
this.inner.lock().listener_states.remove(&addr);
|
this.inner.lock().listener_states.remove(&addr);
|
||||||
trace!("listener state removed for {}", addr);
|
log_net!(debug "listener state removed for {}", addr);
|
||||||
|
|
||||||
// If this happened our low-level listener socket probably died
|
// If this happened our low-level listener socket probably died
|
||||||
// so it's time to restart the network
|
// so it's time to restart the network
|
||||||
|
@ -100,7 +100,7 @@ pub fn new_unbound_shared_tcp_socket(domain: Domain) -> Result<Socket, String> {
|
|||||||
let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))
|
let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))
|
||||||
.map_err(map_to_string)
|
.map_err(map_to_string)
|
||||||
.map_err(logthru_net!("failed to create TCP socket"))?;
|
.map_err(logthru_net!("failed to create TCP socket"))?;
|
||||||
if let Err(e) = socket.set_linger(None) {
|
if let Err(e) = socket.set_linger(Some(core::time::Duration::from_secs(0))) {
|
||||||
log_net!(error "Couldn't set TCP linger: {}", e);
|
log_net!(error "Couldn't set TCP linger: {}", e);
|
||||||
}
|
}
|
||||||
if let Err(e) = socket.set_nodelay(true) {
|
if let Err(e) = socket.set_nodelay(true) {
|
||||||
@ -144,7 +144,7 @@ pub fn new_bound_first_tcp_socket(local_address: SocketAddr) -> Result<Socket, S
|
|||||||
let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))
|
let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))
|
||||||
.map_err(map_to_string)
|
.map_err(map_to_string)
|
||||||
.map_err(logthru_net!("failed to create TCP socket"))?;
|
.map_err(logthru_net!("failed to create TCP socket"))?;
|
||||||
if let Err(e) = socket.set_linger(None) {
|
if let Err(e) = socket.set_linger(Some(core::time::Duration::from_secs(0))) {
|
||||||
log_net!(error "Couldn't set TCP linger: {}", e);
|
log_net!(error "Couldn't set TCP linger: {}", e);
|
||||||
}
|
}
|
||||||
if let Err(e) = socket.set_nodelay(true) {
|
if let Err(e) = socket.set_nodelay(true) {
|
||||||
|
@ -107,6 +107,7 @@ impl RawTcpProtocolHandler {
|
|||||||
stream: AsyncPeekStream,
|
stream: AsyncPeekStream,
|
||||||
socket_addr: SocketAddr,
|
socket_addr: SocketAddr,
|
||||||
) -> Result<Option<NetworkConnection>, String> {
|
) -> Result<Option<NetworkConnection>, String> {
|
||||||
|
log_net!("TCP: on_accept_async: enter");
|
||||||
let mut peekbuf: [u8; PEEK_DETECT_LEN] = [0u8; PEEK_DETECT_LEN];
|
let mut peekbuf: [u8; PEEK_DETECT_LEN] = [0u8; PEEK_DETECT_LEN];
|
||||||
let peeklen = stream
|
let peeklen = stream
|
||||||
.peek(&mut peekbuf)
|
.peek(&mut peekbuf)
|
||||||
@ -125,7 +126,7 @@ impl RawTcpProtocolHandler {
|
|||||||
ProtocolNetworkConnection::RawTcp(RawTcpNetworkConnection::new(stream)),
|
ProtocolNetworkConnection::RawTcp(RawTcpNetworkConnection::new(stream)),
|
||||||
);
|
);
|
||||||
|
|
||||||
log_net!("on_accept_async from: {}", socket_addr);
|
log_net!(debug "TCP: on_accept_async from: {}", socket_addr);
|
||||||
|
|
||||||
Ok(Some(conn))
|
Ok(Some(conn))
|
||||||
}
|
}
|
||||||
|
@ -127,7 +127,9 @@ impl WebsocketProtocolHandler {
|
|||||||
ps: AsyncPeekStream,
|
ps: AsyncPeekStream,
|
||||||
socket_addr: SocketAddr,
|
socket_addr: SocketAddr,
|
||||||
) -> Result<Option<NetworkConnection>, String> {
|
) -> Result<Option<NetworkConnection>, String> {
|
||||||
|
log_net!("WS: on_accept_async: enter");
|
||||||
let request_path_len = self.arc.request_path.len() + 2;
|
let request_path_len = self.arc.request_path.len() + 2;
|
||||||
|
|
||||||
let mut peekbuf: Vec<u8> = vec![0u8; request_path_len];
|
let mut peekbuf: Vec<u8> = vec![0u8; request_path_len];
|
||||||
match io::timeout(
|
match io::timeout(
|
||||||
Duration::from_micros(self.arc.connection_initial_timeout),
|
Duration::from_micros(self.arc.connection_initial_timeout),
|
||||||
@ -143,6 +145,7 @@ impl WebsocketProtocolHandler {
|
|||||||
return Err(e).map_err(map_to_string).map_err(logthru_net!(error));
|
return Err(e).map_err(map_to_string).map_err(logthru_net!(error));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check for websocket path
|
// Check for websocket path
|
||||||
let matches_path = &peekbuf[0..request_path_len - 2] == self.arc.request_path.as_slice()
|
let matches_path = &peekbuf[0..request_path_len - 2] == self.arc.request_path.as_slice()
|
||||||
&& (peekbuf[request_path_len - 2] == b' '
|
&& (peekbuf[request_path_len - 2] == b' '
|
||||||
@ -150,14 +153,10 @@ impl WebsocketProtocolHandler {
|
|||||||
&& peekbuf[request_path_len - 1] == b' '));
|
&& peekbuf[request_path_len - 1] == b' '));
|
||||||
|
|
||||||
if !matches_path {
|
if !matches_path {
|
||||||
log_net!(
|
log_net!("WS: not websocket");
|
||||||
"not websocket: request_path: {} peekbuf:{}",
|
|
||||||
std::str::from_utf8(&self.arc.request_path).unwrap(),
|
|
||||||
std::str::from_utf8(&peekbuf).unwrap()
|
|
||||||
);
|
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
log_net!("found websocket");
|
log_net!("WS: found websocket");
|
||||||
|
|
||||||
let ws_stream = accept_async(ps)
|
let ws_stream = accept_async(ps)
|
||||||
.await
|
.await
|
||||||
@ -182,6 +181,8 @@ impl WebsocketProtocolHandler {
|
|||||||
ProtocolNetworkConnection::WsAccepted(WebsocketNetworkConnection::new(ws_stream)),
|
ProtocolNetworkConnection::WsAccepted(WebsocketNetworkConnection::new(ws_stream)),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
log_net!(debug "{}: on_accept_async from: {}", if self.arc.tls { "WSS" } else { "WS" }, socket_addr);
|
||||||
|
|
||||||
Ok(Some(conn))
|
Ok(Some(conn))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -345,7 +345,11 @@ impl NetworkInterfaces {
|
|||||||
if changed {
|
if changed {
|
||||||
self.cache_best_addresses();
|
self.cache_best_addresses();
|
||||||
|
|
||||||
trace!("NetworkInterfaces refreshed: {:#?}?", self);
|
//trace!("NetworkInterfaces refreshed: {:#?}?", self);
|
||||||
|
trace!(
|
||||||
|
"NetworkInterfaces refreshed: {:#?}?",
|
||||||
|
self.interface_address_cache
|
||||||
|
);
|
||||||
}
|
}
|
||||||
Ok(changed)
|
Ok(changed)
|
||||||
}
|
}
|
||||||
|
@ -66,10 +66,11 @@ impl Default for NetworkManagerStats {
|
|||||||
}
|
}
|
||||||
|
|
||||||
struct ClientWhitelistEntry {
|
struct ClientWhitelistEntry {
|
||||||
last_seen: u64,
|
last_seen_ts: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Mechanism required to contact another node
|
// Mechanism required to contact another node
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
enum ContactMethod {
|
enum ContactMethod {
|
||||||
Unreachable, // Node is not reachable by any means
|
Unreachable, // Node is not reachable by any means
|
||||||
Direct(DialInfo), // Contact the node directly
|
Direct(DialInfo), // Contact the node directly
|
||||||
@ -294,11 +295,11 @@ impl NetworkManager {
|
|||||||
let mut inner = self.inner.lock();
|
let mut inner = self.inner.lock();
|
||||||
match inner.client_whitelist.entry(client) {
|
match inner.client_whitelist.entry(client) {
|
||||||
hashlink::lru_cache::Entry::Occupied(mut entry) => {
|
hashlink::lru_cache::Entry::Occupied(mut entry) => {
|
||||||
entry.get_mut().last_seen = intf::get_timestamp()
|
entry.get_mut().last_seen_ts = intf::get_timestamp()
|
||||||
}
|
}
|
||||||
hashlink::lru_cache::Entry::Vacant(entry) => {
|
hashlink::lru_cache::Entry::Vacant(entry) => {
|
||||||
entry.insert(ClientWhitelistEntry {
|
entry.insert(ClientWhitelistEntry {
|
||||||
last_seen: intf::get_timestamp(),
|
last_seen_ts: intf::get_timestamp(),
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -309,7 +310,7 @@ impl NetworkManager {
|
|||||||
|
|
||||||
match inner.client_whitelist.entry(client) {
|
match inner.client_whitelist.entry(client) {
|
||||||
hashlink::lru_cache::Entry::Occupied(mut entry) => {
|
hashlink::lru_cache::Entry::Occupied(mut entry) => {
|
||||||
entry.get_mut().last_seen = intf::get_timestamp();
|
entry.get_mut().last_seen_ts = intf::get_timestamp();
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
hashlink::lru_cache::Entry::Vacant(_) => false,
|
hashlink::lru_cache::Entry::Vacant(_) => false,
|
||||||
@ -324,7 +325,7 @@ impl NetworkManager {
|
|||||||
while inner
|
while inner
|
||||||
.client_whitelist
|
.client_whitelist
|
||||||
.peek_lru()
|
.peek_lru()
|
||||||
.map(|v| v.1.last_seen < cutoff_timestamp)
|
.map(|v| v.1.last_seen_ts < cutoff_timestamp)
|
||||||
.unwrap_or_default()
|
.unwrap_or_default()
|
||||||
{
|
{
|
||||||
inner.client_whitelist.remove_lru();
|
inner.client_whitelist.remove_lru();
|
||||||
@ -441,7 +442,7 @@ impl NetworkManager {
|
|||||||
&self,
|
&self,
|
||||||
expiration_us: u64,
|
expiration_us: u64,
|
||||||
extra_data: D,
|
extra_data: D,
|
||||||
) -> Result<(Vec<u8>, EventualValueCloneFuture<ReceiptEvent>), String> {
|
) -> Result<(Vec<u8>, EventualValueFuture<ReceiptEvent>), String> {
|
||||||
let receipt_manager = self.receipt_manager();
|
let receipt_manager = self.receipt_manager();
|
||||||
let routing_table = self.routing_table();
|
let routing_table = self.routing_table();
|
||||||
|
|
||||||
@ -454,7 +455,7 @@ impl NetworkManager {
|
|||||||
|
|
||||||
// Record the receipt for later
|
// Record the receipt for later
|
||||||
let exp_ts = intf::get_timestamp() + expiration_us;
|
let exp_ts = intf::get_timestamp() + expiration_us;
|
||||||
let eventual = SingleShotEventual::new(ReceiptEvent::Cancelled);
|
let eventual = SingleShotEventual::new(Some(ReceiptEvent::Cancelled));
|
||||||
let instance = eventual.instance();
|
let instance = eventual.instance();
|
||||||
receipt_manager.record_single_shot_receipt(receipt, exp_ts, eventual);
|
receipt_manager.record_single_shot_receipt(receipt, exp_ts, eventual);
|
||||||
|
|
||||||
@ -761,11 +762,12 @@ impl NetworkManager {
|
|||||||
return Ok(ContactMethod::OutboundRelay(relay_node));
|
return Ok(ContactMethod::OutboundRelay(relay_node));
|
||||||
}
|
}
|
||||||
// Otherwise, we can't reach this node
|
// Otherwise, we can't reach this node
|
||||||
debug!(
|
debug!("unable to reach node {:?}", target_node_ref);
|
||||||
"unable to reach node {:?}: {}",
|
// trace!(
|
||||||
target_node_ref,
|
// "unable to reach node {:?}: {}",
|
||||||
target_node_ref.operate(|e| format!("{:#?}", e))
|
// target_node_ref,
|
||||||
);
|
// target_node_ref.operate(|e| format!("{:#?}", e))
|
||||||
|
// );
|
||||||
Ok(ContactMethod::Unreachable)
|
Ok(ContactMethod::Unreachable)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -797,9 +799,8 @@ impl NetworkManager {
|
|||||||
.await
|
.await
|
||||||
.map_err(logthru_net!("failed to send signal to {:?}", relay_nr))
|
.map_err(logthru_net!("failed to send signal to {:?}", relay_nr))
|
||||||
.map_err(map_to_string)?;
|
.map_err(map_to_string)?;
|
||||||
|
|
||||||
// Wait for the return receipt
|
// Wait for the return receipt
|
||||||
let inbound_nr = match eventual_value.await {
|
let inbound_nr = match eventual_value.await.take_value().unwrap() {
|
||||||
ReceiptEvent::Returned(inbound_nr) => inbound_nr,
|
ReceiptEvent::Returned(inbound_nr) => inbound_nr,
|
||||||
ReceiptEvent::Expired => {
|
ReceiptEvent::Expired => {
|
||||||
return Err(format!(
|
return Err(format!(
|
||||||
@ -888,7 +889,7 @@ impl NetworkManager {
|
|||||||
.map_err(map_to_string)?;
|
.map_err(map_to_string)?;
|
||||||
|
|
||||||
// Wait for the return receipt
|
// Wait for the return receipt
|
||||||
let inbound_nr = match eventual_value.await {
|
let inbound_nr = match eventual_value.await.take_value().unwrap() {
|
||||||
ReceiptEvent::Returned(inbound_nr) => inbound_nr,
|
ReceiptEvent::Returned(inbound_nr) => inbound_nr,
|
||||||
ReceiptEvent::Expired => {
|
ReceiptEvent::Expired => {
|
||||||
return Err(format!("hole punch receipt expired from {:?}", target_nr));
|
return Err(format!("hole punch receipt expired from {:?}", target_nr));
|
||||||
@ -957,8 +958,13 @@ impl NetworkManager {
|
|||||||
data
|
data
|
||||||
};
|
};
|
||||||
|
|
||||||
|
log_net!("send_data via dialinfo to {:?}", node_ref);
|
||||||
// If we don't have last_connection, try to reach out to the peer via its dial info
|
// If we don't have last_connection, try to reach out to the peer via its dial info
|
||||||
match this.get_contact_method(node_ref).map_err(logthru_net!())? {
|
match this
|
||||||
|
.get_contact_method(node_ref.clone())
|
||||||
|
.map_err(logthru_net!(debug))
|
||||||
|
.map(logthru_net!("get_contact_method for {:?}", node_ref))?
|
||||||
|
{
|
||||||
ContactMethod::OutboundRelay(relay_nr) | ContactMethod::InboundRelay(relay_nr) => {
|
ContactMethod::OutboundRelay(relay_nr) | ContactMethod::InboundRelay(relay_nr) => {
|
||||||
this.send_data(relay_nr, data)
|
this.send_data(relay_nr, data)
|
||||||
.await
|
.await
|
||||||
@ -985,7 +991,7 @@ impl NetworkManager {
|
|||||||
.map(|_| SendDataKind::GlobalDirect),
|
.map(|_| SendDataKind::GlobalDirect),
|
||||||
ContactMethod::Unreachable => Err("Can't send to this node".to_owned()),
|
ContactMethod::Unreachable => Err("Can't send to this node".to_owned()),
|
||||||
}
|
}
|
||||||
.map_err(logthru_net!())
|
.map_err(logthru_net!(debug))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1122,7 +1128,7 @@ impl NetworkManager {
|
|||||||
|
|
||||||
// Keep relays assigned and accessible
|
// Keep relays assigned and accessible
|
||||||
async fn relay_management_task_routine(self, _last_ts: u64, cur_ts: u64) -> Result<(), String> {
|
async fn relay_management_task_routine(self, _last_ts: u64, cur_ts: u64) -> Result<(), String> {
|
||||||
log_net!("--- network manager relay_management task");
|
// log_net!("--- network manager relay_management task");
|
||||||
|
|
||||||
// Get our node's current node info and network class and do the right thing
|
// Get our node's current node info and network class and do the right thing
|
||||||
let routing_table = self.routing_table();
|
let routing_table = self.routing_table();
|
||||||
@ -1174,7 +1180,7 @@ impl NetworkManager {
|
|||||||
|
|
||||||
// Compute transfer statistics for the low level network
|
// Compute transfer statistics for the low level network
|
||||||
async fn rolling_transfers_task_routine(self, last_ts: u64, cur_ts: u64) -> Result<(), String> {
|
async fn rolling_transfers_task_routine(self, last_ts: u64, cur_ts: u64) -> Result<(), String> {
|
||||||
log_net!("--- network manager rolling_transfers task");
|
// log_net!("--- network manager rolling_transfers task");
|
||||||
{
|
{
|
||||||
let inner = &mut *self.inner.lock();
|
let inner = &mut *self.inner.lock();
|
||||||
|
|
||||||
|
@ -23,6 +23,9 @@ const UNRELIABLE_PING_INTERVAL_SECS: u32 = 5;
|
|||||||
// remains valid, as well as to make sure we remain in any relay node's routing table
|
// remains valid, as well as to make sure we remain in any relay node's routing table
|
||||||
const KEEPALIVE_PING_INTERVAL_SECS: u32 = 20;
|
const KEEPALIVE_PING_INTERVAL_SECS: u32 = 20;
|
||||||
|
|
||||||
|
// How many times do we try to ping a never-reached node before we call it dead
|
||||||
|
const NEVER_REACHED_PING_COUNT: u32 = 3;
|
||||||
|
|
||||||
// Do not change order here, it will mess up other sorts
|
// Do not change order here, it will mess up other sorts
|
||||||
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
|
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
|
||||||
pub enum BucketEntryState {
|
pub enum BucketEntryState {
|
||||||
@ -58,7 +61,6 @@ impl BucketEntry {
|
|||||||
transfer_stats_accounting: TransferStatsAccounting::new(),
|
transfer_stats_accounting: TransferStatsAccounting::new(),
|
||||||
peer_stats: PeerStats {
|
peer_stats: PeerStats {
|
||||||
time_added: now,
|
time_added: now,
|
||||||
last_seen: None,
|
|
||||||
rpc_stats: RPCStats::default(),
|
rpc_stats: RPCStats::default(),
|
||||||
latency: None,
|
latency: None,
|
||||||
transfer: TransferStatsDownUp::default(),
|
transfer: TransferStatsDownUp::default(),
|
||||||
@ -129,7 +131,7 @@ impl BucketEntry {
|
|||||||
|
|
||||||
pub fn has_valid_signed_node_info(&self) -> bool {
|
pub fn has_valid_signed_node_info(&self) -> bool {
|
||||||
if let Some(sni) = &self.opt_signed_node_info {
|
if let Some(sni) = &self.opt_signed_node_info {
|
||||||
sni.signature.valid
|
sni.is_valid()
|
||||||
} else {
|
} else {
|
||||||
false
|
false
|
||||||
}
|
}
|
||||||
@ -213,8 +215,13 @@ impl BucketEntry {
|
|||||||
|
|
||||||
///// state machine handling
|
///// state machine handling
|
||||||
pub(super) fn check_reliable(&self, cur_ts: u64) -> bool {
|
pub(super) fn check_reliable(&self, cur_ts: u64) -> bool {
|
||||||
// if we have had consecutive ping replies for longer that UNRELIABLE_PING_SPAN_SECS
|
// If we have had any failures to send, this is not reliable
|
||||||
match self.peer_stats.rpc_stats.first_consecutive_answer_time {
|
if self.peer_stats.rpc_stats.failed_to_send > 0 {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// if we have seen the node consistently for longer that UNRELIABLE_PING_SPAN_SECS
|
||||||
|
match self.peer_stats.rpc_stats.first_consecutive_seen_ts {
|
||||||
None => false,
|
None => false,
|
||||||
Some(ts) => {
|
Some(ts) => {
|
||||||
cur_ts.saturating_sub(ts) >= (UNRELIABLE_PING_SPAN_SECS as u64 * 1000000u64)
|
cur_ts.saturating_sub(ts) >= (UNRELIABLE_PING_SPAN_SECS as u64 * 1000000u64)
|
||||||
@ -222,10 +229,15 @@ impl BucketEntry {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
pub(super) fn check_dead(&self, cur_ts: u64) -> bool {
|
pub(super) fn check_dead(&self, cur_ts: u64) -> bool {
|
||||||
|
// If we have failured to send NEVER_REACHED_PING_COUNT times in a row, the node is dead
|
||||||
|
if self.peer_stats.rpc_stats.failed_to_send >= NEVER_REACHED_PING_COUNT {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
// if we have not heard from the node at all for the duration of the unreliable ping span
|
// if we have not heard from the node at all for the duration of the unreliable ping span
|
||||||
// a node is not dead if we haven't heard from it yet
|
// a node is not dead if we haven't heard from it yet,
|
||||||
match self.peer_stats.last_seen {
|
// but we give it NEVER_REACHED_PING_COUNT chances to ping before we say it's dead
|
||||||
None => false,
|
match self.peer_stats.rpc_stats.last_seen_ts {
|
||||||
|
None => self.peer_stats.rpc_stats.recent_lost_answers < NEVER_REACHED_PING_COUNT,
|
||||||
Some(ts) => {
|
Some(ts) => {
|
||||||
cur_ts.saturating_sub(ts) >= (UNRELIABLE_PING_SPAN_SECS as u64 * 1000000u64)
|
cur_ts.saturating_sub(ts) >= (UNRELIABLE_PING_SPAN_SECS as u64 * 1000000u64)
|
||||||
}
|
}
|
||||||
@ -233,9 +245,20 @@ impl BucketEntry {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn needs_constant_ping(&self, cur_ts: u64, interval: u64) -> bool {
|
fn needs_constant_ping(&self, cur_ts: u64, interval: u64) -> bool {
|
||||||
match self.peer_stats.last_seen {
|
// If we have not either seen the node, nor asked it a question in the last 'interval'
|
||||||
|
// then we should ping it
|
||||||
|
let latest_contact_time = self
|
||||||
|
.peer_stats
|
||||||
|
.rpc_stats
|
||||||
|
.last_seen_ts
|
||||||
|
.max(self.peer_stats.rpc_stats.last_question);
|
||||||
|
|
||||||
|
match latest_contact_time {
|
||||||
None => true,
|
None => true,
|
||||||
Some(last_seen) => cur_ts.saturating_sub(last_seen) >= (interval * 1000000u64),
|
Some(latest_contact_time) => {
|
||||||
|
// If we haven't done anything with this node in 'interval' seconds
|
||||||
|
cur_ts.saturating_sub(latest_contact_time) >= (interval * 1000000u64)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -259,19 +282,26 @@ impl BucketEntry {
|
|||||||
match state {
|
match state {
|
||||||
BucketEntryState::Reliable => {
|
BucketEntryState::Reliable => {
|
||||||
// If we are in a reliable state, we need a ping on an exponential scale
|
// If we are in a reliable state, we need a ping on an exponential scale
|
||||||
match self.peer_stats.last_seen {
|
let latest_contact_time = self
|
||||||
None => true,
|
.peer_stats
|
||||||
Some(last_seen) => {
|
.rpc_stats
|
||||||
let first_consecutive_answer_time = self
|
.last_seen_ts
|
||||||
.peer_stats
|
.max(self.peer_stats.rpc_stats.last_question);
|
||||||
.rpc_stats
|
|
||||||
.first_consecutive_answer_time
|
match latest_contact_time {
|
||||||
.unwrap();
|
None => {
|
||||||
let start_of_reliable_time = first_consecutive_answer_time
|
error!("Peer is reliable, but not seen!");
|
||||||
|
true
|
||||||
|
}
|
||||||
|
Some(latest_contact_time) => {
|
||||||
|
let first_consecutive_seen_ts =
|
||||||
|
self.peer_stats.rpc_stats.first_consecutive_seen_ts.unwrap();
|
||||||
|
let start_of_reliable_time = first_consecutive_seen_ts
|
||||||
+ ((UNRELIABLE_PING_SPAN_SECS - UNRELIABLE_PING_INTERVAL_SECS) as u64
|
+ ((UNRELIABLE_PING_SPAN_SECS - UNRELIABLE_PING_INTERVAL_SECS) as u64
|
||||||
* 1_000_000u64);
|
* 1_000_000u64);
|
||||||
let reliable_cur = cur_ts.saturating_sub(start_of_reliable_time);
|
let reliable_cur = cur_ts.saturating_sub(start_of_reliable_time);
|
||||||
let reliable_last = last_seen.saturating_sub(start_of_reliable_time);
|
let reliable_last =
|
||||||
|
latest_contact_time.saturating_sub(start_of_reliable_time);
|
||||||
|
|
||||||
retry_falloff_log(
|
retry_falloff_log(
|
||||||
reliable_last,
|
reliable_last,
|
||||||
@ -292,37 +322,44 @@ impl BucketEntry {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub(super) fn touch_last_seen(&mut self, ts: u64) {
|
pub(super) fn touch_last_seen(&mut self, ts: u64) {
|
||||||
// If we've heard from the node at all, we can always restart our lost ping count
|
|
||||||
self.peer_stats.rpc_stats.recent_lost_answers = 0;
|
|
||||||
// Mark the node as seen
|
// Mark the node as seen
|
||||||
self.peer_stats.last_seen = Some(ts);
|
if self
|
||||||
|
.peer_stats
|
||||||
|
.rpc_stats
|
||||||
|
.first_consecutive_seen_ts
|
||||||
|
.is_none()
|
||||||
|
{
|
||||||
|
self.peer_stats.rpc_stats.first_consecutive_seen_ts = Some(ts);
|
||||||
|
}
|
||||||
|
|
||||||
|
self.peer_stats.rpc_stats.last_seen_ts = Some(ts);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(super) fn state_debug_info(&self, cur_ts: u64) -> String {
|
pub(super) fn state_debug_info(&self, cur_ts: u64) -> String {
|
||||||
let first_consecutive_answer_time = if let Some(first_consecutive_answer_time) =
|
let first_consecutive_seen_ts = if let Some(first_consecutive_seen_ts) =
|
||||||
self.peer_stats.rpc_stats.first_consecutive_answer_time
|
self.peer_stats.rpc_stats.first_consecutive_seen_ts
|
||||||
{
|
{
|
||||||
format!(
|
format!(
|
||||||
"{}s ago",
|
"{}s ago",
|
||||||
timestamp_to_secs(cur_ts.saturating_sub(first_consecutive_answer_time))
|
timestamp_to_secs(cur_ts.saturating_sub(first_consecutive_seen_ts))
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
"never".to_owned()
|
"never".to_owned()
|
||||||
};
|
};
|
||||||
let last_seen = if let Some(last_seen) = self.peer_stats.last_seen {
|
let last_seen_ts_str = if let Some(last_seen_ts) = self.peer_stats.rpc_stats.last_seen_ts {
|
||||||
format!(
|
format!(
|
||||||
"{}s ago",
|
"{}s ago",
|
||||||
timestamp_to_secs(cur_ts.saturating_sub(last_seen))
|
timestamp_to_secs(cur_ts.saturating_sub(last_seen_ts))
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
"never".to_owned()
|
"never".to_owned()
|
||||||
};
|
};
|
||||||
|
|
||||||
format!(
|
format!(
|
||||||
"state: {:?}, first_consecutive_answer_time: {}, last_seen: {}",
|
"state: {:?}, first_consecutive_seen_ts: {}, last_seen_ts: {}",
|
||||||
self.state(cur_ts),
|
self.state(cur_ts),
|
||||||
first_consecutive_answer_time,
|
first_consecutive_seen_ts,
|
||||||
last_seen
|
last_seen_ts_str
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -332,11 +369,10 @@ impl BucketEntry {
|
|||||||
pub(super) fn question_sent(&mut self, ts: u64, bytes: u64, expects_answer: bool) {
|
pub(super) fn question_sent(&mut self, ts: u64, bytes: u64, expects_answer: bool) {
|
||||||
self.transfer_stats_accounting.add_up(bytes);
|
self.transfer_stats_accounting.add_up(bytes);
|
||||||
self.peer_stats.rpc_stats.messages_sent += 1;
|
self.peer_stats.rpc_stats.messages_sent += 1;
|
||||||
|
self.peer_stats.rpc_stats.failed_to_send = 0;
|
||||||
if expects_answer {
|
if expects_answer {
|
||||||
self.peer_stats.rpc_stats.questions_in_flight += 1;
|
self.peer_stats.rpc_stats.questions_in_flight += 1;
|
||||||
}
|
self.peer_stats.rpc_stats.last_question = Some(ts);
|
||||||
if self.peer_stats.last_seen.is_none() {
|
|
||||||
self.peer_stats.last_seen = Some(ts);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pub(super) fn question_rcvd(&mut self, ts: u64, bytes: u64) {
|
pub(super) fn question_rcvd(&mut self, ts: u64, bytes: u64) {
|
||||||
@ -344,33 +380,40 @@ impl BucketEntry {
|
|||||||
self.peer_stats.rpc_stats.messages_rcvd += 1;
|
self.peer_stats.rpc_stats.messages_rcvd += 1;
|
||||||
self.touch_last_seen(ts);
|
self.touch_last_seen(ts);
|
||||||
}
|
}
|
||||||
pub(super) fn answer_sent(&mut self, _ts: u64, bytes: u64) {
|
pub(super) fn answer_sent(&mut self, bytes: u64) {
|
||||||
self.transfer_stats_accounting.add_up(bytes);
|
self.transfer_stats_accounting.add_up(bytes);
|
||||||
self.peer_stats.rpc_stats.messages_sent += 1;
|
self.peer_stats.rpc_stats.messages_sent += 1;
|
||||||
|
self.peer_stats.rpc_stats.failed_to_send = 0;
|
||||||
}
|
}
|
||||||
pub(super) fn answer_rcvd(&mut self, send_ts: u64, recv_ts: u64, bytes: u64) {
|
pub(super) fn answer_rcvd(&mut self, send_ts: u64, recv_ts: u64, bytes: u64) {
|
||||||
self.transfer_stats_accounting.add_down(bytes);
|
self.transfer_stats_accounting.add_down(bytes);
|
||||||
self.peer_stats.rpc_stats.messages_rcvd += 1;
|
self.peer_stats.rpc_stats.messages_rcvd += 1;
|
||||||
self.peer_stats.rpc_stats.questions_in_flight -= 1;
|
self.peer_stats.rpc_stats.questions_in_flight -= 1;
|
||||||
if self
|
|
||||||
.peer_stats
|
|
||||||
.rpc_stats
|
|
||||||
.first_consecutive_answer_time
|
|
||||||
.is_none()
|
|
||||||
{
|
|
||||||
self.peer_stats.rpc_stats.first_consecutive_answer_time = Some(recv_ts);
|
|
||||||
}
|
|
||||||
self.record_latency(recv_ts - send_ts);
|
self.record_latency(recv_ts - send_ts);
|
||||||
self.touch_last_seen(recv_ts);
|
self.touch_last_seen(recv_ts);
|
||||||
|
self.peer_stats.rpc_stats.recent_lost_answers = 0;
|
||||||
}
|
}
|
||||||
pub(super) fn question_lost(&mut self, _ts: u64) {
|
pub(super) fn question_lost(&mut self) {
|
||||||
self.peer_stats.rpc_stats.first_consecutive_answer_time = None;
|
self.peer_stats.rpc_stats.first_consecutive_seen_ts = None;
|
||||||
self.peer_stats.rpc_stats.questions_in_flight -= 1;
|
self.peer_stats.rpc_stats.questions_in_flight -= 1;
|
||||||
|
self.peer_stats.rpc_stats.recent_lost_answers += 1;
|
||||||
|
}
|
||||||
|
pub(super) fn failed_to_send(&mut self, ts: u64, expects_answer: bool) {
|
||||||
|
if expects_answer {
|
||||||
|
self.peer_stats.rpc_stats.last_question = Some(ts);
|
||||||
|
}
|
||||||
|
self.peer_stats.rpc_stats.failed_to_send += 1;
|
||||||
|
self.peer_stats.rpc_stats.first_consecutive_seen_ts = None;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Drop for BucketEntry {
|
impl Drop for BucketEntry {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
assert_eq!(self.ref_count, 0);
|
if self.ref_count != 0 {
|
||||||
|
panic!(
|
||||||
|
"bucket entry dropped with non-zero refcount: {:#?}",
|
||||||
|
self.node_info()
|
||||||
|
)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -75,25 +75,30 @@ impl RoutingTable {
|
|||||||
let mut cnt = 0;
|
let mut cnt = 0;
|
||||||
out += &format!("Entries: {}\n", inner.bucket_entry_count);
|
out += &format!("Entries: {}\n", inner.bucket_entry_count);
|
||||||
while b < blen {
|
while b < blen {
|
||||||
if inner.buckets[b].entries().len() > 0 {
|
let filtered_entries: Vec<(&DHTKey, &BucketEntry)> = inner.buckets[b]
|
||||||
out += &format!(" Bucket #{}:\n", b);
|
.entries()
|
||||||
for e in inner.buckets[b].entries() {
|
.filter(|e| {
|
||||||
let state = e.1.state(cur_ts);
|
let state = e.1.state(cur_ts);
|
||||||
if state >= min_state {
|
state >= min_state
|
||||||
out += &format!(
|
})
|
||||||
" {} [{}]\n",
|
.collect();
|
||||||
e.0.encode(),
|
if !filtered_entries.is_empty() {
|
||||||
match state {
|
out += &format!(" Bucket #{}:\n", b);
|
||||||
BucketEntryState::Reliable => "R",
|
for e in filtered_entries {
|
||||||
BucketEntryState::Unreliable => "U",
|
let state = e.1.state(cur_ts);
|
||||||
BucketEntryState::Dead => "D",
|
out += &format!(
|
||||||
}
|
" {} [{}]\n",
|
||||||
);
|
e.0.encode(),
|
||||||
|
match state {
|
||||||
cnt += 1;
|
BucketEntryState::Reliable => "R",
|
||||||
if cnt >= limit {
|
BucketEntryState::Unreliable => "U",
|
||||||
break;
|
BucketEntryState::Dead => "D",
|
||||||
}
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
cnt += 1;
|
||||||
|
if cnt >= limit {
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if cnt >= limit {
|
if cnt >= limit {
|
||||||
|
@ -72,10 +72,13 @@ impl RoutingTable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn filter_has_valid_signed_node_info(kv: &(&DHTKey, Option<&mut BucketEntry>)) -> bool {
|
pub fn filter_has_valid_signed_node_info(
|
||||||
|
kv: &(&DHTKey, Option<&mut BucketEntry>),
|
||||||
|
own_peer_info_is_valid: bool,
|
||||||
|
) -> bool {
|
||||||
match &kv.1 {
|
match &kv.1 {
|
||||||
None => true,
|
None => own_peer_info_is_valid,
|
||||||
Some(b) => b.has_node_info(),
|
Some(b) => b.has_valid_signed_node_info(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -117,10 +120,11 @@ impl RoutingTable {
|
|||||||
nodes.push(selfkv);
|
nodes.push(selfkv);
|
||||||
}
|
}
|
||||||
// add all nodes from buckets
|
// add all nodes from buckets
|
||||||
|
// Can't use with_entries() here due to lifetime issues
|
||||||
for b in &mut inner.buckets {
|
for b in &mut inner.buckets {
|
||||||
for (k, v) in b.entries_mut() {
|
for (k, v) in b.entries_mut() {
|
||||||
// Don't bother with dead nodes
|
// Don't bother with dead nodes
|
||||||
if !v.check_dead(cur_ts) {
|
if v.state(cur_ts) >= BucketEntryState::Unreliable {
|
||||||
// Apply filter
|
// Apply filter
|
||||||
let kv = (k, Some(v));
|
let kv = (k, Some(v));
|
||||||
if filter(&kv) {
|
if filter(&kv) {
|
||||||
@ -159,13 +163,11 @@ impl RoutingTable {
|
|||||||
// filter
|
// filter
|
||||||
|kv| {
|
|kv| {
|
||||||
if kv.1.is_none() {
|
if kv.1.is_none() {
|
||||||
// filter out self peer, as it is irrelevant to the 'fastest nodes' search
|
// always filter out self peer, as it is irrelevant to the 'fastest nodes' search
|
||||||
return false;
|
false
|
||||||
|
} else {
|
||||||
|
filter.as_ref().map(|f| f(kv)).unwrap_or(true)
|
||||||
}
|
}
|
||||||
if filter.is_some() && !filter.as_ref().unwrap()(kv) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
true
|
|
||||||
},
|
},
|
||||||
// sort
|
// sort
|
||||||
|(a_key, a_entry), (b_key, b_entry)| {
|
|(a_key, a_entry), (b_key, b_entry)| {
|
||||||
@ -237,16 +239,7 @@ impl RoutingTable {
|
|||||||
node_count,
|
node_count,
|
||||||
cur_ts,
|
cur_ts,
|
||||||
// filter
|
// filter
|
||||||
|kv| {
|
|kv| filter.as_ref().map(|f| f(kv)).unwrap_or(true),
|
||||||
if kv.1.is_none() {
|
|
||||||
// include self peer, as it is relevant to the 'closest nodes' search
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
if filter.is_some() && !filter.as_ref().unwrap()(kv) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
true
|
|
||||||
},
|
|
||||||
// sort
|
// sort
|
||||||
|(a_key, a_entry), (b_key, b_entry)| {
|
|(a_key, a_entry), (b_key, b_entry)| {
|
||||||
// same nodes are always the same
|
// same nodes are always the same
|
||||||
|
@ -311,11 +311,11 @@ impl RoutingTable {
|
|||||||
|
|
||||||
// Public dial info changed, go through all nodes and reset their 'seen our node info' bit
|
// Public dial info changed, go through all nodes and reset their 'seen our node info' bit
|
||||||
if matches!(domain, RoutingDomain::PublicInternet) {
|
if matches!(domain, RoutingDomain::PublicInternet) {
|
||||||
for bucket in &mut inner.buckets {
|
let cur_ts = intf::get_timestamp();
|
||||||
for entry in bucket.entries_mut() {
|
Self::with_entries(&mut *inner, cur_ts, BucketEntryState::Dead, |_, e| {
|
||||||
entry.1.set_seen_our_node_info(false);
|
e.set_seen_our_node_info(false);
|
||||||
}
|
Option::<()>::None
|
||||||
}
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
@ -393,26 +393,18 @@ impl RoutingTable {
|
|||||||
let mut inner = this.inner.lock();
|
let mut inner = this.inner.lock();
|
||||||
let mut node_refs = Vec::<NodeRef>::with_capacity(inner.bucket_entry_count);
|
let mut node_refs = Vec::<NodeRef>::with_capacity(inner.bucket_entry_count);
|
||||||
let cur_ts = intf::get_timestamp();
|
let cur_ts = intf::get_timestamp();
|
||||||
for bucket in &mut inner.buckets {
|
Self::with_entries(&mut *inner, cur_ts, BucketEntryState::Unreliable, |k, e| {
|
||||||
for entry in bucket.entries_mut() {
|
// Only update nodes that haven't seen our node info yet
|
||||||
match entry.1.state(cur_ts) {
|
if !e.has_seen_our_node_info() {
|
||||||
BucketEntryState::Reliable | BucketEntryState::Unreliable => {
|
node_refs.push(NodeRef::new(
|
||||||
// Only update nodes that haven't seen our node info yet
|
this.clone(),
|
||||||
if !entry.1.has_seen_our_node_info() {
|
*k,
|
||||||
node_refs.push(NodeRef::new(
|
e,
|
||||||
this.clone(),
|
None,
|
||||||
*entry.0,
|
));
|
||||||
entry.1,
|
|
||||||
None,
|
|
||||||
));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
BucketEntryState::Dead => {
|
|
||||||
// do nothing
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
Option::<()>::None
|
||||||
|
});
|
||||||
node_refs
|
node_refs
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -458,8 +450,8 @@ impl RoutingTable {
|
|||||||
for bucket in &mut inner.buckets {
|
for bucket in &mut inner.buckets {
|
||||||
bucket.kick(0);
|
bucket.kick(0);
|
||||||
}
|
}
|
||||||
log_rtab!(
|
log_rtab!(debug
|
||||||
"Routing table purge complete. Routing table now has {} nodes",
|
"Routing table purge complete. Routing table now has {} nodes",
|
||||||
inner.bucket_entry_count
|
inner.bucket_entry_count
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
@ -473,7 +465,7 @@ impl RoutingTable {
|
|||||||
if let Some(dead_node_ids) = bucket.kick(bucket_depth) {
|
if let Some(dead_node_ids) = bucket.kick(bucket_depth) {
|
||||||
// Remove counts
|
// Remove counts
|
||||||
inner.bucket_entry_count -= dead_node_ids.len();
|
inner.bucket_entry_count -= dead_node_ids.len();
|
||||||
log_rtab!("Routing table now has {} nodes", inner.bucket_entry_count);
|
log_rtab!(debug "Routing table now has {} nodes", inner.bucket_entry_count);
|
||||||
|
|
||||||
// Now purge the routing table inner vectors
|
// Now purge the routing table inner vectors
|
||||||
//let filter = |k: &DHTKey| dead_node_ids.contains(k);
|
//let filter = |k: &DHTKey| dead_node_ids.contains(k);
|
||||||
@ -490,6 +482,34 @@ impl RoutingTable {
|
|||||||
.unwrap()
|
.unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn get_entry_count(inner: &mut RoutingTableInner, min_state: BucketEntryState) -> usize {
|
||||||
|
let mut count = 0usize;
|
||||||
|
let cur_ts = intf::get_timestamp();
|
||||||
|
Self::with_entries(inner, cur_ts, min_state, |_, _| {
|
||||||
|
count += 1;
|
||||||
|
Option::<()>::None
|
||||||
|
});
|
||||||
|
count
|
||||||
|
}
|
||||||
|
|
||||||
|
fn with_entries<T, F: FnMut(&DHTKey, &mut BucketEntry) -> Option<T>>(
|
||||||
|
inner: &mut RoutingTableInner,
|
||||||
|
cur_ts: u64,
|
||||||
|
min_state: BucketEntryState,
|
||||||
|
mut f: F,
|
||||||
|
) -> Option<T> {
|
||||||
|
for bucket in &mut inner.buckets {
|
||||||
|
for entry in bucket.entries_mut() {
|
||||||
|
if entry.1.state(cur_ts) >= min_state {
|
||||||
|
if let Some(out) = f(entry.0, entry.1) {
|
||||||
|
return Some(out);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
fn drop_node_ref(&self, node_id: DHTKey) {
|
fn drop_node_ref(&self, node_id: DHTKey) {
|
||||||
// Reduce ref count on entry
|
// Reduce ref count on entry
|
||||||
let mut inner = self.inner.lock();
|
let mut inner = self.inner.lock();
|
||||||
@ -536,7 +556,8 @@ impl RoutingTable {
|
|||||||
None => {
|
None => {
|
||||||
// Make new entry
|
// Make new entry
|
||||||
inner.bucket_entry_count += 1;
|
inner.bucket_entry_count += 1;
|
||||||
log_rtab!("Routing table now has {} nodes", inner.bucket_entry_count);
|
let cnt = inner.bucket_entry_count;
|
||||||
|
log_rtab!(debug "Routing table now has {} nodes, {} live", cnt, Self::get_entry_count(&mut *inner, BucketEntryState::Unreliable));
|
||||||
let bucket = &mut inner.buckets[idx];
|
let bucket = &mut inner.buckets[idx];
|
||||||
let nr = bucket.add_entry(node_id);
|
let nr = bucket.add_entry(node_id);
|
||||||
|
|
||||||
@ -639,38 +660,32 @@ impl RoutingTable {
|
|||||||
let mut best_inbound_relay: Option<NodeRef> = None;
|
let mut best_inbound_relay: Option<NodeRef> = None;
|
||||||
|
|
||||||
// Iterate all known nodes for candidates
|
// Iterate all known nodes for candidates
|
||||||
for b in &mut inner.buckets {
|
Self::with_entries(&mut *inner, cur_ts, BucketEntryState::Unreliable, |k, e| {
|
||||||
for (k, entry) in b.entries_mut() {
|
// Ensure this node is not on our local network
|
||||||
// Ensure it's not dead
|
if !e
|
||||||
if !matches!(entry.state(cur_ts), BucketEntryState::Dead) {
|
.local_node_info()
|
||||||
// Ensure this node is not on our local network
|
.map(|l| l.has_dial_info())
|
||||||
if !entry
|
.unwrap_or(false)
|
||||||
.local_node_info()
|
{
|
||||||
.map(|l| l.has_dial_info())
|
// Ensure we have the node's status
|
||||||
.unwrap_or(false)
|
if let Some(node_status) = &e.peer_stats().status {
|
||||||
{
|
// Ensure the node will relay
|
||||||
// Ensure we have the node's status
|
if node_status.will_relay {
|
||||||
if let Some(node_status) = &entry.peer_stats().status {
|
if let Some(best_inbound_relay) = best_inbound_relay.as_mut() {
|
||||||
// Ensure the node will relay
|
if best_inbound_relay
|
||||||
if node_status.will_relay {
|
.operate(|best| BucketEntry::cmp_fastest_reliable(cur_ts, best, e))
|
||||||
if let Some(best_inbound_relay) = best_inbound_relay.as_mut() {
|
== std::cmp::Ordering::Greater
|
||||||
if best_inbound_relay.operate(|best| {
|
{
|
||||||
BucketEntry::cmp_fastest_reliable(cur_ts, best, entry)
|
*best_inbound_relay = NodeRef::new(self.clone(), *k, e, None);
|
||||||
}) == std::cmp::Ordering::Greater
|
|
||||||
{
|
|
||||||
*best_inbound_relay =
|
|
||||||
NodeRef::new(self.clone(), *k, entry, None);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
best_inbound_relay =
|
|
||||||
Some(NodeRef::new(self.clone(), *k, entry, None));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
best_inbound_relay = Some(NodeRef::new(self.clone(), *k, e, None));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
Option::<()>::None
|
||||||
|
});
|
||||||
|
|
||||||
best_inbound_relay
|
best_inbound_relay
|
||||||
}
|
}
|
||||||
@ -771,11 +786,105 @@ impl RoutingTable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn resolve_bootstrap(&self, bootstrap: Vec<String>) -> Result<Vec<String>, String> {
|
// Bootstrap lookup process
|
||||||
let mut out = Vec::<String>::new();
|
async fn resolve_bootstrap(&self, bootstrap: Vec<String>) -> Result<Vec<NodeDialInfo>, String> {
|
||||||
|
let mut out = Vec::<NodeDialInfo>::new();
|
||||||
|
|
||||||
|
// Resolve from bootstrap root to bootstrap hostnames
|
||||||
|
let mut bsnames = Vec::<String>::new();
|
||||||
for bh in bootstrap {
|
for bh in bootstrap {
|
||||||
//
|
// Get TXT record for bootstrap (bootstrap.veilid.net, or similar)
|
||||||
|
let records = intf::txt_lookup(&bh).await?;
|
||||||
|
for record in records {
|
||||||
|
// Split the bootstrap name record by commas
|
||||||
|
for rec in record.split(',') {
|
||||||
|
let rec = rec.trim();
|
||||||
|
// If the name specified is fully qualified, go with it
|
||||||
|
let bsname = if rec.ends_with('.') {
|
||||||
|
rec.to_string()
|
||||||
|
}
|
||||||
|
// If the name is not fully qualified, prepend it to the bootstrap name
|
||||||
|
else {
|
||||||
|
format!("{}.{}", rec, bh)
|
||||||
|
};
|
||||||
|
|
||||||
|
// Add to the list of bootstrap name to look up
|
||||||
|
bsnames.push(bsname);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Get bootstrap nodes from hostnames concurrently
|
||||||
|
let mut unord = FuturesUnordered::new();
|
||||||
|
for bsname in bsnames {
|
||||||
|
unord.push(async move {
|
||||||
|
// look up boostrap node txt records
|
||||||
|
let bsnirecords = match intf::txt_lookup(&bsname).await {
|
||||||
|
Err(e) => {
|
||||||
|
warn!("bootstrap node txt lookup failed for {}: {}", bsname, e);
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
Ok(v) => v,
|
||||||
|
};
|
||||||
|
// for each record resolve into node dial info strings
|
||||||
|
let mut nodedialinfos: Vec<NodeDialInfo> = Vec::new();
|
||||||
|
for bsnirecord in bsnirecords {
|
||||||
|
// split bootstrap node record by commas. example:
|
||||||
|
// 7lxDEabK_qgjbe38RtBa3IZLrud84P6NhGP-pRTZzdQ,tcp://bootstrap-dev-alpha.veilid.net:5150,udp://bootstrap-dev-alpha.veilid.net:5150,ws://bootstrap-dev-alpha.veilid.net:5150/ws
|
||||||
|
let mut records = bsnirecord.split(',').map(|x| x.trim());
|
||||||
|
let node_id_str = match records.next() {
|
||||||
|
Some(v) => v,
|
||||||
|
None => {
|
||||||
|
warn!("no node id specified in bootstrap node txt record");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
// Decode the node id
|
||||||
|
let node_id_key = match DHTKey::try_decode(node_id_str) {
|
||||||
|
Ok(v) => v,
|
||||||
|
Err(e) => {
|
||||||
|
warn!(
|
||||||
|
"Invalid node id in bootstrap node record {}: {}",
|
||||||
|
node_id_str, e
|
||||||
|
);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// If this is our own node id, then we skip it for bootstrap, in case we are a bootstrap node
|
||||||
|
if self.node_id() == node_id_key {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Resolve each record and store in node dial infos list
|
||||||
|
let node_id = NodeId::new(node_id_key);
|
||||||
|
for rec in records {
|
||||||
|
let rec = rec.trim();
|
||||||
|
let dial_infos = match DialInfo::try_vec_from_url(rec) {
|
||||||
|
Ok(dis) => dis,
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Couldn't resolve bootstrap node dial info {}: {}", rec, e);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
for dial_info in dial_infos {
|
||||||
|
nodedialinfos.push(NodeDialInfo {
|
||||||
|
node_id: node_id.clone(),
|
||||||
|
dial_info,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Some(nodedialinfos)
|
||||||
|
});
|
||||||
|
}
|
||||||
|
while let Some(ndis) = unord.next().await {
|
||||||
|
if let Some(mut ndis) = ndis {
|
||||||
|
out.append(&mut ndis);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Ok(out)
|
Ok(out)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -791,8 +900,18 @@ impl RoutingTable {
|
|||||||
log_rtab!("--- bootstrap_task");
|
log_rtab!("--- bootstrap_task");
|
||||||
|
|
||||||
// If we aren't specifying a bootstrap node list explicitly, then pull from the bootstrap server(s)
|
// If we aren't specifying a bootstrap node list explicitly, then pull from the bootstrap server(s)
|
||||||
let bootstrap_nodes = if !bootstrap_nodes.is_empty() {
|
let bootstrap_node_dial_infos = if !bootstrap_nodes.is_empty() {
|
||||||
bootstrap_nodes
|
let mut bsnvec = Vec::new();
|
||||||
|
for b in bootstrap_nodes {
|
||||||
|
let ndis = NodeDialInfo::from_str(b.as_str())
|
||||||
|
.map_err(map_to_string)
|
||||||
|
.map_err(logthru_rtab!(
|
||||||
|
"Invalid node dial info in bootstrap entry: {}",
|
||||||
|
b
|
||||||
|
))?;
|
||||||
|
bsnvec.push(ndis);
|
||||||
|
}
|
||||||
|
bsnvec
|
||||||
} else {
|
} else {
|
||||||
// Resolve bootstrap servers and recurse their TXT entries
|
// Resolve bootstrap servers and recurse their TXT entries
|
||||||
self.resolve_bootstrap(bootstrap).await?
|
self.resolve_bootstrap(bootstrap).await?
|
||||||
@ -800,16 +919,13 @@ impl RoutingTable {
|
|||||||
|
|
||||||
// Map all bootstrap entries to a single key with multiple dialinfo
|
// Map all bootstrap entries to a single key with multiple dialinfo
|
||||||
let mut bsmap: BTreeMap<DHTKey, Vec<DialInfoDetail>> = BTreeMap::new();
|
let mut bsmap: BTreeMap<DHTKey, Vec<DialInfoDetail>> = BTreeMap::new();
|
||||||
for b in bootstrap_nodes {
|
for ndi in bootstrap_node_dial_infos {
|
||||||
let ndis = NodeDialInfo::from_str(b.as_str())
|
let node_id = ndi.node_id.key;
|
||||||
.map_err(map_to_string)
|
|
||||||
.map_err(logthru_rtab!("Invalid dial info in bootstrap entry: {}", b))?;
|
|
||||||
let node_id = ndis.node_id.key;
|
|
||||||
bsmap
|
bsmap
|
||||||
.entry(node_id)
|
.entry(node_id)
|
||||||
.or_insert_with(Vec::new)
|
.or_insert_with(Vec::new)
|
||||||
.push(DialInfoDetail {
|
.push(DialInfoDetail {
|
||||||
dial_info: ndis.dial_info,
|
dial_info: ndi.dial_info,
|
||||||
class: DialInfoClass::Direct, // Bootstraps are always directly reachable
|
class: DialInfoClass::Direct, // Bootstraps are always directly reachable
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@ -846,13 +962,15 @@ impl RoutingTable {
|
|||||||
"bootstrap at {:?} did not return valid signed node info",
|
"bootstrap at {:?} did not return valid signed node info",
|
||||||
nr
|
nr
|
||||||
);
|
);
|
||||||
// xxx: delete the node?
|
// If this node info is invalid, it will time out after being unpingable
|
||||||
} else {
|
} else {
|
||||||
// otherwise this bootstrap is valid, lets ask it to find ourselves now
|
// otherwise this bootstrap is valid, lets ask it to find ourselves now
|
||||||
this.reverse_find_node(nr, true).await
|
this.reverse_find_node(nr, true).await
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Wait for all bootstrap operations to complete before we complete the singlefuture
|
||||||
while unord.next().await.is_some() {}
|
while unord.next().await.is_some() {}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -865,15 +983,20 @@ impl RoutingTable {
|
|||||||
async fn peer_minimum_refresh_task_routine(self) -> Result<(), String> {
|
async fn peer_minimum_refresh_task_routine(self) -> Result<(), String> {
|
||||||
log_rtab!("--- peer_minimum_refresh task");
|
log_rtab!("--- peer_minimum_refresh task");
|
||||||
|
|
||||||
// get list of all peers we know about, even the unreliable ones, and ask them to bootstrap too
|
// get list of all peers we know about, even the unreliable ones, and ask them to find nodes close to our node too
|
||||||
let noderefs = {
|
let noderefs = {
|
||||||
let mut inner = self.inner.lock();
|
let mut inner = self.inner.lock();
|
||||||
let mut noderefs = Vec::<NodeRef>::with_capacity(inner.bucket_entry_count);
|
let mut noderefs = Vec::<NodeRef>::with_capacity(inner.bucket_entry_count);
|
||||||
for b in &mut inner.buckets {
|
let cur_ts = intf::get_timestamp();
|
||||||
for (k, entry) in b.entries_mut() {
|
Self::with_entries(
|
||||||
noderefs.push(NodeRef::new(self.clone(), *k, entry, None))
|
&mut *inner,
|
||||||
}
|
cur_ts,
|
||||||
}
|
BucketEntryState::Unreliable,
|
||||||
|
|k, entry| {
|
||||||
|
noderefs.push(NodeRef::new(self.clone(), *k, entry, None));
|
||||||
|
Option::<()>::None
|
||||||
|
},
|
||||||
|
);
|
||||||
noderefs
|
noderefs
|
||||||
};
|
};
|
||||||
log_rtab!(" refreshing with nodes: {:?}", noderefs);
|
log_rtab!(" refreshing with nodes: {:?}", noderefs);
|
||||||
@ -892,32 +1015,31 @@ impl RoutingTable {
|
|||||||
// Ping each node in the routing table if they need to be pinged
|
// Ping each node in the routing table if they need to be pinged
|
||||||
// to determine their reliability
|
// to determine their reliability
|
||||||
async fn ping_validator_task_routine(self, _last_ts: u64, cur_ts: u64) -> Result<(), String> {
|
async fn ping_validator_task_routine(self, _last_ts: u64, cur_ts: u64) -> Result<(), String> {
|
||||||
log_rtab!("--- ping_validator task");
|
// log_rtab!("--- ping_validator task");
|
||||||
|
|
||||||
let rpc = self.rpc_processor();
|
let rpc = self.rpc_processor();
|
||||||
let netman = self.network_manager();
|
let netman = self.network_manager();
|
||||||
let relay_node_id = netman.relay_node().map(|nr| nr.node_id());
|
let relay_node_id = netman.relay_node().map(|nr| nr.node_id());
|
||||||
|
|
||||||
let mut inner = self.inner.lock();
|
let mut inner = self.inner.lock();
|
||||||
for b in &mut inner.buckets {
|
Self::with_entries(&mut *inner, cur_ts, BucketEntryState::Unreliable, |k, e| {
|
||||||
for (k, entry) in b.entries_mut() {
|
if e.needs_ping(k, cur_ts, relay_node_id) {
|
||||||
if entry.needs_ping(k, cur_ts, relay_node_id) {
|
let nr = NodeRef::new(self.clone(), *k, e, None);
|
||||||
let nr = NodeRef::new(self.clone(), *k, entry, None);
|
log_rtab!(
|
||||||
log_rtab!(
|
" --- ping validating: {:?} ({})",
|
||||||
" --- ping validating: {:?} ({})",
|
nr,
|
||||||
nr,
|
e.state_debug_info(cur_ts)
|
||||||
entry.state_debug_info(cur_ts)
|
);
|
||||||
);
|
intf::spawn_local(rpc.clone().rpc_call_status(nr)).detach();
|
||||||
intf::spawn_local(rpc.clone().rpc_call_status(nr)).detach();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
Option::<()>::None
|
||||||
|
});
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Compute transfer statistics to determine how 'fast' a node is
|
// Compute transfer statistics to determine how 'fast' a node is
|
||||||
async fn rolling_transfers_task_routine(self, last_ts: u64, cur_ts: u64) -> Result<(), String> {
|
async fn rolling_transfers_task_routine(self, last_ts: u64, cur_ts: u64) -> Result<(), String> {
|
||||||
log_rtab!("--- rolling_transfers task");
|
// log_rtab!("--- rolling_transfers task");
|
||||||
let inner = &mut *self.inner.lock();
|
let inner = &mut *self.inner.lock();
|
||||||
|
|
||||||
// Roll our own node's transfers
|
// Roll our own node's transfers
|
||||||
@ -940,8 +1062,8 @@ impl RoutingTable {
|
|||||||
// Do rolling transfers every ROLLING_TRANSFERS_INTERVAL_SECS secs
|
// Do rolling transfers every ROLLING_TRANSFERS_INTERVAL_SECS secs
|
||||||
self.unlocked_inner.rolling_transfers_task.tick().await?;
|
self.unlocked_inner.rolling_transfers_task.tick().await?;
|
||||||
|
|
||||||
// If routing table is empty, then add the bootstrap nodes to it
|
// If routing table has no live entries, then add the bootstrap nodes to it
|
||||||
if self.inner.lock().bucket_entry_count == 0 {
|
if Self::get_entry_count(&mut *self.inner.lock(), BucketEntryState::Unreliable) == 0 {
|
||||||
self.unlocked_inner.bootstrap_task.tick().await?;
|
self.unlocked_inner.bootstrap_task.tick().await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -950,7 +1072,9 @@ impl RoutingTable {
|
|||||||
let c = self.config.get();
|
let c = self.config.get();
|
||||||
c.network.dht.min_peer_count as usize
|
c.network.dht.min_peer_count as usize
|
||||||
};
|
};
|
||||||
if self.inner.lock().bucket_entry_count < min_peer_count {
|
if Self::get_entry_count(&mut *self.inner.lock(), BucketEntryState::Unreliable)
|
||||||
|
< min_peer_count
|
||||||
|
{
|
||||||
self.unlocked_inner.peer_minimum_refresh_task.tick().await?;
|
self.unlocked_inner.peer_minimum_refresh_task.tick().await?;
|
||||||
}
|
}
|
||||||
// Ping validate some nodes to groom the table
|
// Ping validate some nodes to groom the table
|
||||||
@ -987,13 +1111,13 @@ impl RoutingTable {
|
|||||||
e.question_rcvd(ts, bytes);
|
e.question_rcvd(ts, bytes);
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
pub fn stats_answer_sent(&self, node_ref: NodeRef, ts: u64, bytes: u64) {
|
pub fn stats_answer_sent(&self, node_ref: NodeRef, bytes: u64) {
|
||||||
self.inner
|
self.inner
|
||||||
.lock()
|
.lock()
|
||||||
.self_transfer_stats_accounting
|
.self_transfer_stats_accounting
|
||||||
.add_up(bytes);
|
.add_up(bytes);
|
||||||
node_ref.operate(|e| {
|
node_ref.operate(|e| {
|
||||||
e.answer_sent(ts, bytes);
|
e.answer_sent(bytes);
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
pub fn stats_answer_rcvd(&self, node_ref: NodeRef, send_ts: u64, recv_ts: u64, bytes: u64) {
|
pub fn stats_answer_rcvd(&self, node_ref: NodeRef, send_ts: u64, recv_ts: u64, bytes: u64) {
|
||||||
@ -1009,9 +1133,14 @@ impl RoutingTable {
|
|||||||
e.answer_rcvd(send_ts, recv_ts, bytes);
|
e.answer_rcvd(send_ts, recv_ts, bytes);
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
pub fn stats_question_lost(&self, node_ref: NodeRef, ts: u64) {
|
pub fn stats_question_lost(&self, node_ref: NodeRef) {
|
||||||
node_ref.operate(|e| {
|
node_ref.operate(|e| {
|
||||||
e.question_lost(ts);
|
e.question_lost();
|
||||||
|
})
|
||||||
|
}
|
||||||
|
pub fn stats_failed_to_send(&self, node_ref: NodeRef, ts: u64, expects_answer: bool) {
|
||||||
|
node_ref.operate(|e| {
|
||||||
|
e.failed_to_send(ts, expects_answer);
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -214,8 +214,8 @@ impl NodeRef {
|
|||||||
// Get the last connection and the last time we saw anything with this connection
|
// Get the last connection and the last time we saw anything with this connection
|
||||||
let (last_connection, last_seen) = self.operate(|e| {
|
let (last_connection, last_seen) = self.operate(|e| {
|
||||||
if let Some((last_connection, connection_ts)) = e.last_connection() {
|
if let Some((last_connection, connection_ts)) = e.last_connection() {
|
||||||
if let Some(last_seen) = e.peer_stats().last_seen {
|
if let Some(last_seen_ts) = e.peer_stats().rpc_stats.last_seen_ts {
|
||||||
Some((last_connection, u64::max(last_seen, connection_ts)))
|
Some((last_connection, u64::max(last_seen_ts, connection_ts)))
|
||||||
} else {
|
} else {
|
||||||
Some((last_connection, connection_ts))
|
Some((last_connection, connection_ts))
|
||||||
}
|
}
|
||||||
|
@ -88,6 +88,7 @@ macro_rules! map_error_panic {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl RPCProcessor {
|
impl RPCProcessor {
|
||||||
|
#[allow(dead_code)]
|
||||||
pub(super) fn get_rpc_request_debug_info<T: capnp::message::ReaderSegments>(
|
pub(super) fn get_rpc_request_debug_info<T: capnp::message::ReaderSegments>(
|
||||||
&self,
|
&self,
|
||||||
dest: &Destination,
|
dest: &Destination,
|
||||||
@ -104,6 +105,7 @@ impl RPCProcessor {
|
|||||||
self.get_rpc_message_debug_info(message)
|
self.get_rpc_message_debug_info(message)
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
#[allow(dead_code)]
|
||||||
pub(super) fn get_rpc_reply_debug_info<T: capnp::message::ReaderSegments>(
|
pub(super) fn get_rpc_reply_debug_info<T: capnp::message::ReaderSegments>(
|
||||||
&self,
|
&self,
|
||||||
request_rpcreader: &RPCMessageReader,
|
request_rpcreader: &RPCMessageReader,
|
||||||
|
@ -348,16 +348,12 @@ impl RPCProcessor {
|
|||||||
.map_err(map_error_internal!("invalid timeout"))?;
|
.map_err(map_error_internal!("invalid timeout"))?;
|
||||||
// wait for eventualvalue
|
// wait for eventualvalue
|
||||||
let start_ts = get_timestamp();
|
let start_ts = get_timestamp();
|
||||||
timeout(timeout_ms, waitable_reply.eventual.instance())
|
let res = timeout(timeout_ms, waitable_reply.eventual.instance())
|
||||||
.await
|
.await
|
||||||
.map_err(|_| RPCError::Timeout)?;
|
.map_err(|_| RPCError::Timeout)?;
|
||||||
match waitable_reply.eventual.take_value() {
|
let rpcreader = res.take_value().unwrap();
|
||||||
None => panic!("there should be a reply value but there wasn't"),
|
let end_ts = get_timestamp();
|
||||||
Some(rpcreader) => {
|
Ok((rpcreader, end_ts - start_ts))
|
||||||
let end_ts = get_timestamp();
|
|
||||||
Ok((rpcreader, end_ts - start_ts))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
async fn wait_for_reply(
|
async fn wait_for_reply(
|
||||||
&self,
|
&self,
|
||||||
@ -369,7 +365,7 @@ impl RPCProcessor {
|
|||||||
self.cancel_op_id_waiter(waitable_reply.op_id);
|
self.cancel_op_id_waiter(waitable_reply.op_id);
|
||||||
|
|
||||||
self.routing_table()
|
self.routing_table()
|
||||||
.stats_question_lost(waitable_reply.node_ref.clone(), waitable_reply.send_ts);
|
.stats_question_lost(waitable_reply.node_ref.clone());
|
||||||
}
|
}
|
||||||
Ok((rpcreader, _)) => {
|
Ok((rpcreader, _)) => {
|
||||||
// Note that we definitely received this node info since we got a reply
|
// Note that we definitely received this node info since we got a reply
|
||||||
@ -396,7 +392,7 @@ impl RPCProcessor {
|
|||||||
message: capnp::message::Reader<T>,
|
message: capnp::message::Reader<T>,
|
||||||
safety_route_spec: Option<&SafetyRouteSpec>,
|
safety_route_spec: Option<&SafetyRouteSpec>,
|
||||||
) -> Result<Option<WaitableReply>, RPCError> {
|
) -> Result<Option<WaitableReply>, RPCError> {
|
||||||
log_rpc!(self.get_rpc_request_debug_info(&dest, &message, &safety_route_spec));
|
//log_rpc!(self.get_rpc_request_debug_info(&dest, &message, &safety_route_spec));
|
||||||
|
|
||||||
let (op_id, wants_answer) = {
|
let (op_id, wants_answer) = {
|
||||||
let operation = message
|
let operation = message
|
||||||
@ -539,6 +535,7 @@ impl RPCProcessor {
|
|||||||
|
|
||||||
// send question
|
// send question
|
||||||
let bytes = out.len() as u64;
|
let bytes = out.len() as u64;
|
||||||
|
let send_ts = get_timestamp();
|
||||||
let send_data_kind = match self
|
let send_data_kind = match self
|
||||||
.network_manager()
|
.network_manager()
|
||||||
.send_envelope(node_ref.clone(), Some(out_node_id), out)
|
.send_envelope(node_ref.clone(), Some(out_node_id), out)
|
||||||
@ -552,12 +549,15 @@ impl RPCProcessor {
|
|||||||
if eventual.is_some() {
|
if eventual.is_some() {
|
||||||
self.cancel_op_id_waiter(op_id);
|
self.cancel_op_id_waiter(op_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
self.routing_table()
|
||||||
|
.stats_failed_to_send(node_ref, send_ts, wants_answer);
|
||||||
|
|
||||||
return Err(e);
|
return Err(e);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// Successfully sent
|
// Successfully sent
|
||||||
let send_ts = get_timestamp();
|
|
||||||
self.routing_table()
|
self.routing_table()
|
||||||
.stats_question_sent(node_ref.clone(), send_ts, bytes, wants_answer);
|
.stats_question_sent(node_ref.clone(), send_ts, bytes, wants_answer);
|
||||||
|
|
||||||
@ -586,7 +586,7 @@ impl RPCProcessor {
|
|||||||
reply_msg: capnp::message::Reader<T>,
|
reply_msg: capnp::message::Reader<T>,
|
||||||
safety_route_spec: Option<&SafetyRouteSpec>,
|
safety_route_spec: Option<&SafetyRouteSpec>,
|
||||||
) -> Result<(), RPCError> {
|
) -> Result<(), RPCError> {
|
||||||
log_rpc!(self.get_rpc_reply_debug_info(&request_rpcreader, &reply_msg, &safety_route_spec));
|
// log_rpc!(self.get_rpc_reply_debug_info(&request_rpcreader, &reply_msg, &safety_route_spec));
|
||||||
|
|
||||||
//
|
//
|
||||||
let out_node_id;
|
let out_node_id;
|
||||||
@ -721,16 +721,19 @@ impl RPCProcessor {
|
|||||||
|
|
||||||
// Send the reply
|
// Send the reply
|
||||||
let bytes = out.len() as u64;
|
let bytes = out.len() as u64;
|
||||||
|
let send_ts = get_timestamp();
|
||||||
self.network_manager()
|
self.network_manager()
|
||||||
.send_envelope(node_ref.clone(), Some(out_node_id), out)
|
.send_envelope(node_ref.clone(), Some(out_node_id), out)
|
||||||
.await
|
.await
|
||||||
.map_err(RPCError::Internal)?;
|
.map_err(RPCError::Internal)
|
||||||
|
.map_err(|e| {
|
||||||
|
self.routing_table()
|
||||||
|
.stats_failed_to_send(node_ref.clone(), send_ts, false);
|
||||||
|
e
|
||||||
|
})?;
|
||||||
|
|
||||||
// Reply successfully sent
|
// Reply successfully sent
|
||||||
let send_ts = get_timestamp();
|
self.routing_table().stats_answer_sent(node_ref, bytes);
|
||||||
|
|
||||||
self.routing_table()
|
|
||||||
.stats_answer_sent(node_ref, send_ts, bytes);
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -982,10 +985,14 @@ impl RPCProcessor {
|
|||||||
|
|
||||||
// find N nodes closest to the target node in our routing table
|
// find N nodes closest to the target node in our routing table
|
||||||
let own_peer_info = routing_table.get_own_peer_info();
|
let own_peer_info = routing_table.get_own_peer_info();
|
||||||
|
let own_peer_info_is_valid = own_peer_info.signed_node_info.is_valid();
|
||||||
|
|
||||||
let closest_nodes = routing_table.find_closest_nodes(
|
let closest_nodes = routing_table.find_closest_nodes(
|
||||||
target_node_id,
|
target_node_id,
|
||||||
// filter
|
// filter
|
||||||
Some(Box::new(RoutingTable::filter_has_valid_signed_node_info)),
|
Some(Box::new(move |kv| {
|
||||||
|
RoutingTable::filter_has_valid_signed_node_info(kv, own_peer_info_is_valid)
|
||||||
|
})),
|
||||||
// transform
|
// transform
|
||||||
|e| RoutingTable::transform_to_peer_info(e, &own_peer_info),
|
|e| RoutingTable::transform_to_peer_info(e, &own_peer_info),
|
||||||
);
|
);
|
||||||
@ -1569,7 +1576,7 @@ impl RPCProcessor {
|
|||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
// Wait for receipt
|
// Wait for receipt
|
||||||
match eventual_value.await {
|
match eventual_value.await.take_value().unwrap() {
|
||||||
ReceiptEvent::Returned(_) => Ok(true),
|
ReceiptEvent::Returned(_) => Ok(true),
|
||||||
ReceiptEvent::Expired => Ok(false),
|
ReceiptEvent::Expired => Ok(false),
|
||||||
ReceiptEvent::Cancelled => {
|
ReceiptEvent::Cancelled => {
|
||||||
|
@ -342,6 +342,9 @@ pub struct NodeInfo {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl NodeInfo {
|
impl NodeInfo {
|
||||||
|
pub fn is_valid(&self) -> bool {
|
||||||
|
!matches!(self.network_class, NetworkClass::Invalid)
|
||||||
|
}
|
||||||
pub fn first_filtered_dial_info_detail<F>(&self, filter: F) -> Option<DialInfoDetail>
|
pub fn first_filtered_dial_info_detail<F>(&self, filter: F) -> Option<DialInfoDetail>
|
||||||
where
|
where
|
||||||
F: Fn(&DialInfoDetail) -> bool,
|
F: Fn(&DialInfoDetail) -> bool,
|
||||||
@ -1036,8 +1039,9 @@ impl DialInfo {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn try_vec_from_url(url: String) -> Result<Vec<Self>, VeilidAPIError> {
|
pub fn try_vec_from_url<S: AsRef<str>>(url: S) -> Result<Vec<Self>, VeilidAPIError> {
|
||||||
let split_url = SplitUrl::from_str(&url)
|
let url = url.as_ref();
|
||||||
|
let split_url = SplitUrl::from_str(url)
|
||||||
.map_err(|e| parse_error!(format!("unable to split url: {}", e), url))?;
|
.map_err(|e| parse_error!(format!("unable to split url: {}", e), url))?;
|
||||||
|
|
||||||
let port = match split_url.scheme.as_str() {
|
let port = match split_url.scheme.as_str() {
|
||||||
@ -1070,11 +1074,11 @@ impl DialInfo {
|
|||||||
"tcp" => Self::tcp_from_socketaddr(sa),
|
"tcp" => Self::tcp_from_socketaddr(sa),
|
||||||
"ws" => Self::try_ws(
|
"ws" => Self::try_ws(
|
||||||
SocketAddress::from_socket_addr(sa).to_canonical(),
|
SocketAddress::from_socket_addr(sa).to_canonical(),
|
||||||
url.clone(),
|
url.to_string(),
|
||||||
)?,
|
)?,
|
||||||
"wss" => Self::try_wss(
|
"wss" => Self::try_wss(
|
||||||
SocketAddress::from_socket_addr(sa).to_canonical(),
|
SocketAddress::from_socket_addr(sa).to_canonical(),
|
||||||
url.clone(),
|
url.to_string(),
|
||||||
)?,
|
)?,
|
||||||
_ => {
|
_ => {
|
||||||
unreachable!("Invalid dial info url scheme")
|
unreachable!("Invalid dial info url scheme")
|
||||||
@ -1202,6 +1206,10 @@ impl SignedNodeInfo {
|
|||||||
timestamp: intf::get_timestamp(),
|
timestamp: intf::get_timestamp(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn is_valid(&self) -> bool {
|
||||||
|
self.signature.valid && self.node_info.is_valid()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
@ -1281,7 +1289,7 @@ impl MatchesDialInfoFilter for ConnectionDescriptor {
|
|||||||
if !self.matches_peer_scope(filter.peer_scope) {
|
if !self.matches_peer_scope(filter.peer_scope) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
if filter.protocol_set.contains(self.protocol_type()) {
|
if !filter.protocol_set.contains(self.protocol_type()) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
if let Some(at) = filter.address_type {
|
if let Some(at) = filter.address_type {
|
||||||
@ -1356,19 +1364,20 @@ pub struct RPCStats {
|
|||||||
pub messages_sent: u32, // number of rpcs that have been sent in the total_time range
|
pub messages_sent: u32, // number of rpcs that have been sent in the total_time range
|
||||||
pub messages_rcvd: u32, // number of rpcs that have been received in the total_time range
|
pub messages_rcvd: u32, // number of rpcs that have been received in the total_time range
|
||||||
pub questions_in_flight: u32, // number of questions issued that have yet to be answered
|
pub questions_in_flight: u32, // number of questions issued that have yet to be answered
|
||||||
//pub last_question: Option<u64>, // when the peer was last questioned and we want an answer
|
pub last_question: Option<u64>, // when the peer was last questioned (either successfully or not) and we wanted an answer
|
||||||
pub first_consecutive_answer_time: Option<u64>, // the timestamp of the first answer in a series of consecutive questions
|
pub last_seen_ts: Option<u64>, // when the peer was last seen for any reason, including when we first attempted to reach out to it
|
||||||
|
pub first_consecutive_seen_ts: Option<u64>, // the timestamp of the first consecutive proof-of-life for this node (an answer or received question)
|
||||||
pub recent_lost_answers: u32, // number of answers that have been lost since we lost reliability
|
pub recent_lost_answers: u32, // number of answers that have been lost since we lost reliability
|
||||||
|
pub failed_to_send: u32, // number of messages that have failed to send since we last successfully sent one
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
|
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
|
||||||
pub struct PeerStats {
|
pub struct PeerStats {
|
||||||
pub time_added: u64, // when the peer was added to the routing table
|
pub time_added: u64, // when the peer was added to the routing table
|
||||||
pub last_seen: Option<u64>, // when the peer was last seen for any reason, including when we first attempted to reach out to it
|
pub rpc_stats: RPCStats, // information about RPCs
|
||||||
pub rpc_stats: RPCStats, // information about RPCs
|
|
||||||
pub latency: Option<LatencyStats>, // latencies for communications with the peer
|
pub latency: Option<LatencyStats>, // latencies for communications with the peer
|
||||||
pub transfer: TransferStatsDownUp, // Stats for communications with the peer
|
pub transfer: TransferStatsDownUp, // Stats for communications with the peer
|
||||||
pub status: Option<NodeStatus>, // Last known node status
|
pub status: Option<NodeStatus>, // Last known node status
|
||||||
}
|
}
|
||||||
|
|
||||||
cfg_if! {
|
cfg_if! {
|
||||||
|
@ -296,6 +296,8 @@ impl VeilidConfig {
|
|||||||
get_config!(inner.network.max_connections_per_ip6_prefix_size);
|
get_config!(inner.network.max_connections_per_ip6_prefix_size);
|
||||||
get_config!(inner.network.max_connection_frequency_per_min);
|
get_config!(inner.network.max_connection_frequency_per_min);
|
||||||
get_config!(inner.network.client_whitelist_timeout_ms);
|
get_config!(inner.network.client_whitelist_timeout_ms);
|
||||||
|
get_config!(inner.network.reverse_connection_receipt_time_ms);
|
||||||
|
get_config!(inner.network.hole_punch_receipt_time_ms);
|
||||||
get_config!(inner.network.bootstrap);
|
get_config!(inner.network.bootstrap);
|
||||||
get_config!(inner.network.bootstrap_nodes);
|
get_config!(inner.network.bootstrap_nodes);
|
||||||
get_config!(inner.network.routing_table.limit_over_attached);
|
get_config!(inner.network.routing_table.limit_over_attached);
|
||||||
|
@ -20,6 +20,96 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
////////
|
||||||
|
///
|
||||||
|
|
||||||
|
pub struct Peek<'a> {
|
||||||
|
aps: AsyncPeekStream,
|
||||||
|
buf: &'a mut [u8],
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Unpin for Peek<'_> {}
|
||||||
|
|
||||||
|
impl Future for Peek<'_> {
|
||||||
|
type Output = std::io::Result<usize>;
|
||||||
|
|
||||||
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
|
let this = &mut *self;
|
||||||
|
|
||||||
|
let mut inner = this.aps.inner.lock();
|
||||||
|
let inner = &mut *inner;
|
||||||
|
//
|
||||||
|
let buf_len = this.buf.len();
|
||||||
|
let mut copy_len = buf_len;
|
||||||
|
if buf_len > inner.peekbuf_len {
|
||||||
|
//
|
||||||
|
inner.peekbuf.resize(buf_len, 0u8);
|
||||||
|
let mut read_future = inner
|
||||||
|
.stream
|
||||||
|
.read(&mut inner.peekbuf.as_mut_slice()[inner.peekbuf_len..buf_len]);
|
||||||
|
let read_len = match Pin::new(&mut read_future).poll(cx) {
|
||||||
|
Poll::Pending => {
|
||||||
|
inner.peekbuf.resize(inner.peekbuf_len, 0u8);
|
||||||
|
return Poll::Pending;
|
||||||
|
}
|
||||||
|
Poll::Ready(Err(e)) => {
|
||||||
|
return Poll::Ready(Err(e));
|
||||||
|
}
|
||||||
|
Poll::Ready(Ok(v)) => v,
|
||||||
|
};
|
||||||
|
inner.peekbuf_len += read_len;
|
||||||
|
inner.peekbuf.resize(inner.peekbuf_len, 0u8);
|
||||||
|
copy_len = inner.peekbuf_len;
|
||||||
|
}
|
||||||
|
this.buf[..copy_len].copy_from_slice(&inner.peekbuf[..copy_len]);
|
||||||
|
Poll::Ready(Ok(copy_len))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
////////
|
||||||
|
///
|
||||||
|
|
||||||
|
pub struct PeekExact<'a> {
|
||||||
|
aps: AsyncPeekStream,
|
||||||
|
buf: &'a mut [u8],
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Unpin for PeekExact<'_> {}
|
||||||
|
|
||||||
|
impl Future for PeekExact<'_> {
|
||||||
|
type Output = std::io::Result<usize>;
|
||||||
|
|
||||||
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
|
let this = &mut *self;
|
||||||
|
|
||||||
|
let mut inner = this.aps.inner.lock();
|
||||||
|
let inner = &mut *inner;
|
||||||
|
//
|
||||||
|
let buf_len = this.buf.len();
|
||||||
|
let mut copy_len = buf_len;
|
||||||
|
if buf_len > inner.peekbuf_len {
|
||||||
|
//
|
||||||
|
inner.peekbuf.resize(buf_len, 0u8);
|
||||||
|
let mut read_future = inner
|
||||||
|
.stream
|
||||||
|
.read_exact(&mut inner.peekbuf.as_mut_slice()[inner.peekbuf_len..buf_len]);
|
||||||
|
match Pin::new(&mut read_future).poll(cx) {
|
||||||
|
Poll::Pending => {
|
||||||
|
inner.peekbuf.resize(inner.peekbuf_len, 0u8);
|
||||||
|
return Poll::Pending;
|
||||||
|
}
|
||||||
|
Poll::Ready(Err(e)) => {
|
||||||
|
return Poll::Ready(Err(e));
|
||||||
|
}
|
||||||
|
Poll::Ready(Ok(())) => (),
|
||||||
|
};
|
||||||
|
inner.peekbuf_len = buf_len;
|
||||||
|
copy_len = inner.peekbuf_len;
|
||||||
|
}
|
||||||
|
this.buf[..copy_len].copy_from_slice(&inner.peekbuf[..copy_len]);
|
||||||
|
Poll::Ready(Ok(copy_len))
|
||||||
|
}
|
||||||
|
}
|
||||||
/////////
|
/////////
|
||||||
///
|
///
|
||||||
struct AsyncPeekStreamInner {
|
struct AsyncPeekStreamInner {
|
||||||
@ -50,60 +140,18 @@ impl AsyncPeekStream {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn peek(&'_ self, buf: &'_ mut [u8]) -> Result<usize> {
|
pub fn peek<'a>(&'a self, buf: &'a mut [u8]) -> Peek<'a> {
|
||||||
let (mut stream, mut peekbuf, mut peekbuf_len) = {
|
Peek::<'a> {
|
||||||
let inner = self.inner.lock();
|
aps: self.clone(),
|
||||||
(
|
buf,
|
||||||
inner.stream.clone_stream(),
|
|
||||||
inner.peekbuf.clone(),
|
|
||||||
inner.peekbuf_len,
|
|
||||||
)
|
|
||||||
};
|
|
||||||
//
|
|
||||||
let buf_len = buf.len();
|
|
||||||
let mut copy_len = buf_len;
|
|
||||||
if buf_len > peekbuf_len {
|
|
||||||
//
|
|
||||||
peekbuf.resize(buf_len, 0u8);
|
|
||||||
let read_len = stream
|
|
||||||
.read(&mut peekbuf.as_mut_slice()[peekbuf_len..buf_len])
|
|
||||||
.await?;
|
|
||||||
peekbuf_len += read_len;
|
|
||||||
copy_len = peekbuf_len;
|
|
||||||
}
|
}
|
||||||
buf[..copy_len].copy_from_slice(&peekbuf[..copy_len]);
|
|
||||||
|
|
||||||
let mut inner = self.inner.lock();
|
|
||||||
inner.peekbuf = peekbuf;
|
|
||||||
inner.peekbuf_len = peekbuf_len;
|
|
||||||
Ok(copy_len)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn peek_exact(&'_ self, buf: &'_ mut [u8]) -> Result<()> {
|
pub fn peek_exact<'a>(&'a self, buf: &'a mut [u8]) -> PeekExact<'a> {
|
||||||
let (mut stream, mut peekbuf, mut peekbuf_len) = {
|
PeekExact::<'a> {
|
||||||
let inner = self.inner.lock();
|
aps: self.clone(),
|
||||||
(
|
buf,
|
||||||
inner.stream.clone_stream(),
|
|
||||||
inner.peekbuf.clone(),
|
|
||||||
inner.peekbuf_len,
|
|
||||||
)
|
|
||||||
};
|
|
||||||
//
|
|
||||||
let buf_len = buf.len();
|
|
||||||
if buf_len > peekbuf_len {
|
|
||||||
//
|
|
||||||
peekbuf.resize(buf_len, 0u8);
|
|
||||||
stream
|
|
||||||
.read_exact(&mut peekbuf.as_mut_slice()[peekbuf_len..buf_len])
|
|
||||||
.await?;
|
|
||||||
peekbuf_len = buf_len;
|
|
||||||
}
|
}
|
||||||
buf.copy_from_slice(&peekbuf[..buf_len]);
|
|
||||||
|
|
||||||
let mut inner = self.inner.lock();
|
|
||||||
inner.peekbuf = peekbuf;
|
|
||||||
inner.peekbuf_len = peekbuf_len;
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -62,7 +62,7 @@ pub struct EventualValueFuture<T: Unpin> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<T: Unpin> Future for EventualValueFuture<T> {
|
impl<T: Unpin> Future for EventualValueFuture<T> {
|
||||||
type Output = ();
|
type Output = EventualValue<T>;
|
||||||
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
|
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
|
||||||
let this = &mut *self;
|
let this = &mut *self;
|
||||||
let out = {
|
let out = {
|
||||||
@ -76,7 +76,7 @@ impl<T: Unpin> Future for EventualValueFuture<T> {
|
|||||||
for w in wakers {
|
for w in wakers {
|
||||||
w.wake();
|
w.wake();
|
||||||
}
|
}
|
||||||
task::Poll::<Self::Output>::Ready(())
|
task::Poll::<Self::Output>::Ready(this.eventual.clone())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -97,6 +97,14 @@ macro_rules! log_rtab {
|
|||||||
(warn $fmt:literal, $($arg:expr),+) => {
|
(warn $fmt:literal, $($arg:expr),+) => {
|
||||||
warn!(target:"rtab", $fmt, $($arg),+);
|
warn!(target:"rtab", $fmt, $($arg),+);
|
||||||
};
|
};
|
||||||
|
(debug $text:expr) => { debug!(
|
||||||
|
target: "rtab",
|
||||||
|
"{}",
|
||||||
|
$text,
|
||||||
|
)};
|
||||||
|
(debug $fmt:literal, $($arg:expr),+) => {
|
||||||
|
debug!(target:"rtab", $fmt, $($arg),+);
|
||||||
|
};
|
||||||
($text:expr) => {trace!(
|
($text:expr) => {trace!(
|
||||||
target: "rtab",
|
target: "rtab",
|
||||||
"{}",
|
"{}",
|
||||||
@ -230,7 +238,7 @@ macro_rules! logthru {
|
|||||||
(error $target:literal) => (|e__| {
|
(error $target:literal) => (|e__| {
|
||||||
error!(
|
error!(
|
||||||
target: $target,
|
target: $target,
|
||||||
"[{}]",
|
"[{:?}]",
|
||||||
e__,
|
e__,
|
||||||
);
|
);
|
||||||
e__
|
e__
|
||||||
@ -238,7 +246,7 @@ macro_rules! logthru {
|
|||||||
(error $target:literal, $text:literal) => (|e__| {
|
(error $target:literal, $text:literal) => (|e__| {
|
||||||
error!(
|
error!(
|
||||||
target: $target,
|
target: $target,
|
||||||
"[{}] {}",
|
"[{:?}] {}",
|
||||||
e__,
|
e__,
|
||||||
$text
|
$text
|
||||||
);
|
);
|
||||||
@ -247,7 +255,7 @@ macro_rules! logthru {
|
|||||||
(error $target:literal, $fmt:literal, $($arg:expr),+) => (|e__| {
|
(error $target:literal, $fmt:literal, $($arg:expr),+) => (|e__| {
|
||||||
error!(
|
error!(
|
||||||
target: $target,
|
target: $target,
|
||||||
concat!("[{}] ", $fmt),
|
concat!("[{:?}] ", $fmt),
|
||||||
e__,
|
e__,
|
||||||
$($arg),+
|
$($arg),+
|
||||||
);
|
);
|
||||||
@ -257,7 +265,7 @@ macro_rules! logthru {
|
|||||||
(warn $target:literal) => (|e__| {
|
(warn $target:literal) => (|e__| {
|
||||||
warn!(
|
warn!(
|
||||||
target: $target,
|
target: $target,
|
||||||
"[{}]",
|
"[{:?}]",
|
||||||
e__,
|
e__,
|
||||||
);
|
);
|
||||||
e__
|
e__
|
||||||
@ -265,7 +273,7 @@ macro_rules! logthru {
|
|||||||
(warn $target:literal, $text:literal) => (|e__| {
|
(warn $target:literal, $text:literal) => (|e__| {
|
||||||
warn!(
|
warn!(
|
||||||
target: $target,
|
target: $target,
|
||||||
"[{}] {}",
|
"[{:?}] {}",
|
||||||
e__,
|
e__,
|
||||||
$text
|
$text
|
||||||
);
|
);
|
||||||
@ -274,7 +282,7 @@ macro_rules! logthru {
|
|||||||
(warn $target:literal, $fmt:literal, $($arg:expr),+) => (|e__| {
|
(warn $target:literal, $fmt:literal, $($arg:expr),+) => (|e__| {
|
||||||
warn!(
|
warn!(
|
||||||
target: $target,
|
target: $target,
|
||||||
concat!("[{}] ", $fmt),
|
concat!("[{:?}] ", $fmt),
|
||||||
e__,
|
e__,
|
||||||
$($arg),+
|
$($arg),+
|
||||||
);
|
);
|
||||||
@ -284,7 +292,7 @@ macro_rules! logthru {
|
|||||||
(debug $target:literal) => (|e__| {
|
(debug $target:literal) => (|e__| {
|
||||||
debug!(
|
debug!(
|
||||||
target: $target,
|
target: $target,
|
||||||
"[{}]",
|
"[{:?}]",
|
||||||
e__,
|
e__,
|
||||||
);
|
);
|
||||||
e__
|
e__
|
||||||
@ -292,7 +300,7 @@ macro_rules! logthru {
|
|||||||
(debug $target:literal, $text:literal) => (|e__| {
|
(debug $target:literal, $text:literal) => (|e__| {
|
||||||
debug!(
|
debug!(
|
||||||
target: $target,
|
target: $target,
|
||||||
"[{}] {}",
|
"[{:?}] {}",
|
||||||
e__,
|
e__,
|
||||||
$text
|
$text
|
||||||
);
|
);
|
||||||
@ -301,7 +309,7 @@ macro_rules! logthru {
|
|||||||
(debug $target:literal, $fmt:literal, $($arg:expr),+) => (|e__| {
|
(debug $target:literal, $fmt:literal, $($arg:expr),+) => (|e__| {
|
||||||
debug!(
|
debug!(
|
||||||
target: $target,
|
target: $target,
|
||||||
concat!("[{}] ", $fmt),
|
concat!("[{:?}] ", $fmt),
|
||||||
e__,
|
e__,
|
||||||
$($arg),+
|
$($arg),+
|
||||||
);
|
);
|
||||||
@ -311,7 +319,7 @@ macro_rules! logthru {
|
|||||||
($target:literal) => (|e__| {
|
($target:literal) => (|e__| {
|
||||||
trace!(
|
trace!(
|
||||||
target: $target,
|
target: $target,
|
||||||
"[{}]",
|
"[{:?}]",
|
||||||
e__,
|
e__,
|
||||||
);
|
);
|
||||||
e__
|
e__
|
||||||
@ -319,7 +327,7 @@ macro_rules! logthru {
|
|||||||
($target:literal, $text:literal) => (|e__| {
|
($target:literal, $text:literal) => (|e__| {
|
||||||
trace!(
|
trace!(
|
||||||
target: $target,
|
target: $target,
|
||||||
"[{}] {}",
|
"[{:?}] {}",
|
||||||
e__,
|
e__,
|
||||||
$text
|
$text
|
||||||
);
|
);
|
||||||
@ -328,7 +336,7 @@ macro_rules! logthru {
|
|||||||
($target:literal, $fmt:literal, $($arg:expr),+) => (|e__| {
|
($target:literal, $fmt:literal, $($arg:expr),+) => (|e__| {
|
||||||
trace!(
|
trace!(
|
||||||
target: $target,
|
target: $target,
|
||||||
concat!("[{}] ", $fmt),
|
concat!("[{:?}] ", $fmt),
|
||||||
e__,
|
e__,
|
||||||
$($arg),+
|
$($arg),+
|
||||||
);
|
);
|
||||||
|
@ -2,38 +2,43 @@ use super::*;
|
|||||||
|
|
||||||
pub struct SingleShotEventual<T>
|
pub struct SingleShotEventual<T>
|
||||||
where
|
where
|
||||||
T: Unpin + Clone,
|
T: Unpin,
|
||||||
{
|
{
|
||||||
eventual: EventualValueClone<T>,
|
eventual: EventualValue<T>,
|
||||||
drop_value: T,
|
drop_value: Option<T>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> Drop for SingleShotEventual<T>
|
impl<T> Drop for SingleShotEventual<T>
|
||||||
where
|
where
|
||||||
T: Unpin + Clone,
|
T: Unpin,
|
||||||
{
|
{
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
self.eventual.resolve(self.drop_value.clone());
|
if let Some(drop_value) = self.drop_value.take() {
|
||||||
|
self.eventual.resolve(drop_value);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> SingleShotEventual<T>
|
impl<T> SingleShotEventual<T>
|
||||||
where
|
where
|
||||||
T: Unpin + Clone,
|
T: Unpin,
|
||||||
{
|
{
|
||||||
pub fn new(drop_value: T) -> Self {
|
pub fn new(drop_value: Option<T>) -> Self {
|
||||||
Self {
|
Self {
|
||||||
eventual: EventualValueClone::new(),
|
eventual: EventualValue::new(),
|
||||||
drop_value,
|
drop_value,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Can only call this once, it consumes the eventual
|
// Can only call this once, it consumes the eventual
|
||||||
pub fn resolve(self, value: T) -> EventualResolvedFuture<EventualValueClone<T>> {
|
pub fn resolve(mut self, value: T) -> EventualResolvedFuture<EventualValue<T>> {
|
||||||
|
// If we resolve, we don't want to resolve again to the drop value
|
||||||
|
self.drop_value = None;
|
||||||
|
// Resolve to the specified value
|
||||||
self.eventual.resolve(value)
|
self.eventual.resolve(value)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn instance(&self) -> EventualValueCloneFuture<T> {
|
pub fn instance(&self) -> EventualValueFuture<T> {
|
||||||
self.eventual.instance()
|
self.eventual.instance()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -49,7 +49,7 @@ Future<VeilidConfig> getDefaultVeilidConfig() async {
|
|||||||
holePunchReceiptTimeMs: 5000,
|
holePunchReceiptTimeMs: 5000,
|
||||||
nodeId: "",
|
nodeId: "",
|
||||||
nodeIdSecret: "",
|
nodeIdSecret: "",
|
||||||
bootstrap: [],
|
bootstrap: ["bootstrap-dev.veilid.net"],
|
||||||
bootstrapNodes: [],
|
bootstrapNodes: [],
|
||||||
routingTable: VeilidConfigRoutingTable(
|
routingTable: VeilidConfigRoutingTable(
|
||||||
limitOverAttached: 64,
|
limitOverAttached: 64,
|
||||||
|
@ -63,7 +63,7 @@ core:
|
|||||||
hole_punch_receipt_time_ms: 5000
|
hole_punch_receipt_time_ms: 5000
|
||||||
node_id: ''
|
node_id: ''
|
||||||
node_id_secret: ''
|
node_id_secret: ''
|
||||||
bootstrap: ['bootstrap.veilid.net']
|
bootstrap: ['bootstrap-dev.veilid.net']
|
||||||
bootstrap_nodes: []
|
bootstrap_nodes: []
|
||||||
routing_table:
|
routing_table:
|
||||||
limit_over_attached: 64
|
limit_over_attached: 64
|
||||||
|
Loading…
Reference in New Issue
Block a user