fix binding issues

This commit is contained in:
John Smith 2023-06-21 13:40:12 -04:00
parent 2272b8e139
commit 754abf2135
13 changed files with 121 additions and 71 deletions

7
Cargo.lock generated
View File

@ -2071,6 +2071,12 @@ dependencies = [
"spin 0.9.8", "spin 0.9.8",
] ]
[[package]]
name = "fn_name"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "528a0eb35b41b895aef1afed5ab28659084118e730edc72f033e76fb71666dbb"
[[package]] [[package]]
name = "fnv" name = "fnv"
version = "1.0.7" version = "1.0.7"
@ -6591,6 +6597,7 @@ dependencies = [
"cfg-if 1.0.0", "cfg-if 1.0.0",
"console_error_panic_hook", "console_error_panic_hook",
"eyre", "eyre",
"fn_name",
"futures-util", "futures-util",
"jni 0.21.1", "jni 0.21.1",
"jni-sys", "jni-sys",

View File

@ -107,14 +107,21 @@ impl CommandProcessor {
self.ui_sender().add_node_event( self.ui_sender().add_node_event(
Level::Info, Level::Info,
r#"Commands: r#"Commands:
exit/quit - exit the client exit/quit exit the client
disconnect - disconnect the client from the Veilid node disconnect disconnect the client from the Veilid node
shutdown - shut the server down shutdown shut the server down
attach - attach the server to the Veilid network attach attach the server to the Veilid network
detach - detach the server from the Veilid network detach detach the server from the Veilid network
debug - send a debugging command to the Veilid server debug [command] send a debugging command to the Veilid server
change_log_level - change the log level for a tracing layer change_log_level <layer> <level> change the log level for a tracing layer
reply - reply to an AppCall not handled directly by the server layers include:
all, terminal, system, api, file, otlp
levels include:
error, warn, info, debug, trace
reply <call id> <message> reply to an AppCall not handled directly by the server
<call id> must be exact call id reported in VeilidUpdate
<message> can be a string (left trimmed) or
it can start with a '#' followed by a string of undelimited hex bytes
"# "#
.to_owned(), .to_owned(),
); );
@ -188,7 +195,7 @@ reply - reply to an AppCall not handled directly by the server
spawn_detached_local(async move { spawn_detached_local(async move {
match capi.server_debug(rest.unwrap_or_default()).await { match capi.server_debug(rest.unwrap_or_default()).await {
Ok(output) => { Ok(output) => {
ui.add_node_event(Level::Debug, output); ui.add_node_event(Level::Info, output);
ui.send_callback(callback); ui.send_callback(callback);
} }
Err(e) => { Err(e) => {

View File

@ -196,6 +196,7 @@ impl AttachmentManager {
if let Err(err) = netman.startup().await { if let Err(err) = netman.startup().await {
error!("network startup failed: {}", err); error!("network startup failed: {}", err);
netman.shutdown().await; netman.shutdown().await;
restart = true;
break; break;
} }

View File

@ -906,7 +906,7 @@ impl NetworkManager {
return Ok(NetworkResult::timeout()); return Ok(NetworkResult::timeout());
} }
ReceiptEvent::Cancelled => { ReceiptEvent::Cancelled => {
bail!("reverse connect receipt cancelled from {:?}", target_nr); return Ok(NetworkResult::no_connection_other(format!("reverse connect receipt cancelled from {}", target_nr)))
} }
}; };

View File

@ -115,6 +115,15 @@ impl Network {
// tcp_stream.peer_addr().unwrap(), // tcp_stream.peer_addr().unwrap(),
// ); // );
if let Err(e) = tcp_stream.set_linger(Some(core::time::Duration::from_secs(0))) {
log_net!(debug "Couldn't set TCP linger: {}", e);
return;
}
if let Err(e) = tcp_stream.set_nodelay(true) {
log_net!(debug "Couldn't set TCP nodelay: {}", e);
return;
}
let listener_state = listener_state.clone(); let listener_state = listener_state.clone();
let connection_manager = connection_manager.clone(); let connection_manager = connection_manager.clone();

View File

@ -33,7 +33,7 @@ cfg_if! {
} }
} }
#[instrument(level = "trace", ret, err)] #[instrument(level = "trace", ret)]
pub fn new_unbound_shared_udp_socket(domain: Domain) -> io::Result<Socket> { pub fn new_unbound_shared_udp_socket(domain: Domain) -> io::Result<Socket> {
let socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?; let socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
if domain == Domain::IPV6 { if domain == Domain::IPV6 {
@ -49,7 +49,7 @@ pub fn new_unbound_shared_udp_socket(domain: Domain) -> io::Result<Socket> {
Ok(socket) Ok(socket)
} }
#[instrument(level = "trace", ret, err)] #[instrument(level = "trace", ret)]
pub fn new_bound_shared_udp_socket(local_address: SocketAddr) -> io::Result<Socket> { pub fn new_bound_shared_udp_socket(local_address: SocketAddr) -> io::Result<Socket> {
let domain = Domain::for_address(local_address); let domain = Domain::for_address(local_address);
let socket = new_unbound_shared_udp_socket(domain)?; let socket = new_unbound_shared_udp_socket(domain)?;
@ -61,7 +61,7 @@ pub fn new_bound_shared_udp_socket(local_address: SocketAddr) -> io::Result<Sock
Ok(socket) Ok(socket)
} }
#[instrument(level = "trace", ret, err)] #[instrument(level = "trace", ret)]
pub fn new_bound_first_udp_socket(local_address: SocketAddr) -> io::Result<Socket> { pub fn new_bound_first_udp_socket(local_address: SocketAddr) -> io::Result<Socket> {
let domain = Domain::for_address(local_address); let domain = Domain::for_address(local_address);
let socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?; let socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
@ -95,7 +95,7 @@ pub fn new_bound_first_udp_socket(local_address: SocketAddr) -> io::Result<Socke
Ok(socket) Ok(socket)
} }
#[instrument(level = "trace", ret, err)] #[instrument(level = "trace", ret)]
pub fn new_unbound_shared_tcp_socket(domain: Domain) -> io::Result<Socket> { pub fn new_unbound_shared_tcp_socket(domain: Domain) -> io::Result<Socket> {
let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))?; let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))?;
if let Err(e) = socket.set_linger(Some(core::time::Duration::from_secs(0))) { if let Err(e) = socket.set_linger(Some(core::time::Duration::from_secs(0))) {
@ -117,7 +117,7 @@ pub fn new_unbound_shared_tcp_socket(domain: Domain) -> io::Result<Socket> {
Ok(socket) Ok(socket)
} }
#[instrument(level = "trace", ret, err)] #[instrument(level = "trace", ret)]
pub fn new_bound_shared_tcp_socket(local_address: SocketAddr) -> io::Result<Socket> { pub fn new_bound_shared_tcp_socket(local_address: SocketAddr) -> io::Result<Socket> {
let domain = Domain::for_address(local_address); let domain = Domain::for_address(local_address);
let socket = new_unbound_shared_tcp_socket(domain)?; let socket = new_unbound_shared_tcp_socket(domain)?;
@ -129,7 +129,7 @@ pub fn new_bound_shared_tcp_socket(local_address: SocketAddr) -> io::Result<Sock
Ok(socket) Ok(socket)
} }
#[instrument(level = "trace", ret, err)] #[instrument(level = "trace", ret)]
pub fn new_bound_first_tcp_socket(local_address: SocketAddr) -> io::Result<Socket> { pub fn new_bound_first_tcp_socket(local_address: SocketAddr) -> io::Result<Socket> {
let domain = Domain::for_address(local_address); let domain = Domain::for_address(local_address);

View File

@ -243,7 +243,24 @@ impl Network {
bail!("No valid listen address: {}", listen_address); bail!("No valid listen address: {}", listen_address);
} }
let port = sockaddrs[0].port(); let port = sockaddrs[0].port();
if !self.bind_first_tcp_port(port) {
let mut attempts = 10;
let mut success = false;
while attempts >= 0 {
if self.bind_first_tcp_port(port) {
success = true;
break;
}
attempts -= 1;
// Wait 5 seconds before trying again
log_net!(debug
"Binding TCP port at {} failed, waiting. Attempts remaining = {}",
port, attempts
);
sleep(5000).await
}
if !success {
bail!("Could not find free tcp port to listen on"); bail!("Could not find free tcp port to listen on");
} }
Ok((port, sockaddrs.iter().map(|s| s.ip()).collect())) Ok((port, sockaddrs.iter().map(|s| s.ip()).collect()))

View File

@ -5,7 +5,7 @@ impl RoutingTable {
pub fn find_all_closest_peers(&self, key: TypedKey) -> NetworkResult<Vec<PeerInfo>> { pub fn find_all_closest_peers(&self, key: TypedKey) -> NetworkResult<Vec<PeerInfo>> {
let Some(own_peer_info) = self.get_own_peer_info(RoutingDomain::PublicInternet) else { let Some(own_peer_info) = self.get_own_peer_info(RoutingDomain::PublicInternet) else {
// Our own node info is not yet available, drop this request. // Our own node info is not yet available, drop this request.
return NetworkResult::service_unavailable(); return NetworkResult::service_unavailable("Not finding closest peers because our peer info is not yet available");
}; };
// find N nodes closest to the target node in our routing table // find N nodes closest to the target node in our routing table

View File

@ -591,20 +591,8 @@ impl VeilidAPI {
.map_err(VeilidAPIError::internal)? .map_err(VeilidAPIError::internal)?
{ {
NetworkResult::Value(v) => v, NetworkResult::Value(v) => v,
NetworkResult::Timeout => { r => {
return Ok("Timeout".to_owned()); return Ok(r.to_string());
}
NetworkResult::ServiceUnavailable => {
return Ok("ServiceUnavailable".to_owned());
}
NetworkResult::NoConnection(e) => {
return Ok(format!("NoConnection({})", e));
}
NetworkResult::AlreadyExists(e) => {
return Ok(format!("AlreadyExists({})", e));
}
NetworkResult::InvalidMessage(e) => {
return Ok(format!("InvalidMessage({})", e));
} }
}; };

View File

@ -148,7 +148,10 @@ impl RoutingContext {
let answer = match rpc_processor.rpc_call_app_call(dest, request).await { let answer = match rpc_processor.rpc_call_app_call(dest, request).await {
Ok(NetworkResult::Value(v)) => v, Ok(NetworkResult::Value(v)) => v,
Ok(NetworkResult::Timeout) => apibail_timeout!(), Ok(NetworkResult::Timeout) => apibail_timeout!(),
Ok(NetworkResult::ServiceUnavailable) => apibail_try_again!(), Ok(NetworkResult::ServiceUnavailable(e)) => {
log_network_result!(format!("app_call: ServiceUnavailable({})", e));
apibail_try_again!()
}
Ok(NetworkResult::NoConnection(e)) | Ok(NetworkResult::AlreadyExists(e)) => { Ok(NetworkResult::NoConnection(e)) | Ok(NetworkResult::AlreadyExists(e)) => {
apibail_no_connection!(e); apibail_no_connection!(e);
} }
@ -173,7 +176,10 @@ impl RoutingContext {
match rpc_processor.rpc_call_app_message(dest, message).await { match rpc_processor.rpc_call_app_message(dest, message).await {
Ok(NetworkResult::Value(())) => {} Ok(NetworkResult::Value(())) => {}
Ok(NetworkResult::Timeout) => apibail_timeout!(), Ok(NetworkResult::Timeout) => apibail_timeout!(),
Ok(NetworkResult::ServiceUnavailable) => apibail_try_again!(), Ok(NetworkResult::ServiceUnavailable(e)) => {
log_network_result!(format!("app_message: ServiceUnavailable({})", e));
apibail_try_again!()
}
Ok(NetworkResult::NoConnection(e)) | Ok(NetworkResult::AlreadyExists(e)) => { Ok(NetworkResult::NoConnection(e)) | Ok(NetworkResult::AlreadyExists(e)) => {
apibail_no_connection!(e); apibail_no_connection!(e);
} }

View File

@ -35,6 +35,7 @@ stop-token = { version = "^0", default-features = false }
rand = "^0.7" rand = "^0.7"
rust-fsm = "^0" rust-fsm = "^0"
backtrace = "^0" backtrace = "^0"
fn_name = "^0"
# Dependencies for native builds only # Dependencies for native builds only
# Linux, Windows, Mac, iOS, Android # Linux, Windows, Mac, iOS, Android

View File

@ -134,6 +134,8 @@ use parking_lot::*;
use stop_token::*; use stop_token::*;
use thiserror::Error as ThisError; use thiserror::Error as ThisError;
pub use fn_name;
// For iOS tests // For iOS tests
#[no_mangle] #[no_mangle]
pub extern "C" fn main_rs() {} pub extern "C" fn main_rs() {}

View File

@ -68,7 +68,7 @@ impl<T, E> NetworkResultResultExt<T, E> for NetworkResult<Result<T, E>> {
fn into_result_network_result(self) -> Result<NetworkResult<T>, E> { fn into_result_network_result(self) -> Result<NetworkResult<T>, E> {
match self { match self {
NetworkResult::Timeout => Ok(NetworkResult::<T>::Timeout), NetworkResult::Timeout => Ok(NetworkResult::<T>::Timeout),
NetworkResult::ServiceUnavailable => Ok(NetworkResult::<T>::ServiceUnavailable), NetworkResult::ServiceUnavailable(s) => Ok(NetworkResult::<T>::ServiceUnavailable(s)),
NetworkResult::NoConnection(e) => Ok(NetworkResult::<T>::NoConnection(e)), NetworkResult::NoConnection(e) => Ok(NetworkResult::<T>::NoConnection(e)),
NetworkResult::AlreadyExists(e) => Ok(NetworkResult::<T>::AlreadyExists(e)), NetworkResult::AlreadyExists(e) => Ok(NetworkResult::<T>::AlreadyExists(e)),
NetworkResult::InvalidMessage(s) => Ok(NetworkResult::<T>::InvalidMessage(s)), NetworkResult::InvalidMessage(s) => Ok(NetworkResult::<T>::InvalidMessage(s)),
@ -161,7 +161,7 @@ impl<T> FoldedNetworkResultExt<T> for io::Result<NetworkResult<T>> {
#[must_use] #[must_use]
pub enum NetworkResult<T> { pub enum NetworkResult<T> {
Timeout, Timeout,
ServiceUnavailable, ServiceUnavailable(String),
NoConnection(io::Error), NoConnection(io::Error),
AlreadyExists(io::Error), AlreadyExists(io::Error),
InvalidMessage(String), InvalidMessage(String),
@ -172,8 +172,8 @@ impl<T> NetworkResult<T> {
pub fn timeout() -> Self { pub fn timeout() -> Self {
Self::Timeout Self::Timeout
} }
pub fn service_unavailable() -> Self { pub fn service_unavailable<S: ToString>(s: S) -> Self {
Self::ServiceUnavailable Self::ServiceUnavailable(s.to_string())
} }
pub fn no_connection(e: io::Error) -> Self { pub fn no_connection(e: io::Error) -> Self {
Self::NoConnection(e) Self::NoConnection(e)
@ -206,7 +206,7 @@ impl<T> NetworkResult<T> {
pub fn map<X, F: Fn(T) -> X>(self, f: F) -> NetworkResult<X> { pub fn map<X, F: Fn(T) -> X>(self, f: F) -> NetworkResult<X> {
match self { match self {
Self::Timeout => NetworkResult::<X>::Timeout, Self::Timeout => NetworkResult::<X>::Timeout,
Self::ServiceUnavailable => NetworkResult::<X>::ServiceUnavailable, Self::ServiceUnavailable(s) => NetworkResult::<X>::ServiceUnavailable(s),
Self::NoConnection(e) => NetworkResult::<X>::NoConnection(e), Self::NoConnection(e) => NetworkResult::<X>::NoConnection(e),
Self::AlreadyExists(e) => NetworkResult::<X>::AlreadyExists(e), Self::AlreadyExists(e) => NetworkResult::<X>::AlreadyExists(e),
Self::InvalidMessage(s) => NetworkResult::<X>::InvalidMessage(s), Self::InvalidMessage(s) => NetworkResult::<X>::InvalidMessage(s),
@ -216,9 +216,9 @@ impl<T> NetworkResult<T> {
pub fn into_result(self) -> Result<T, io::Error> { pub fn into_result(self) -> Result<T, io::Error> {
match self { match self {
Self::Timeout => Err(io::Error::new(io::ErrorKind::TimedOut, "Timed out")), Self::Timeout => Err(io::Error::new(io::ErrorKind::TimedOut, "Timed out")),
Self::ServiceUnavailable => Err(io::Error::new( Self::ServiceUnavailable(s) => Err(io::Error::new(
io::ErrorKind::NotFound, io::ErrorKind::NotFound,
"Service unavailable", format!("Service unavailable: {}", s),
)), )),
Self::NoConnection(e) => Err(e), Self::NoConnection(e) => Err(e),
Self::AlreadyExists(e) => Err(e), Self::AlreadyExists(e) => Err(e),
@ -244,7 +244,7 @@ impl<T: Debug> Debug for NetworkResult<T> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self { match self {
Self::Timeout => write!(f, "Timeout"), Self::Timeout => write!(f, "Timeout"),
Self::ServiceUnavailable => write!(f, "ServiceUnavailable"), Self::ServiceUnavailable(s) => f.debug_tuple("ServiceUnavailable").field(s).finish(),
Self::NoConnection(e) => f.debug_tuple("NoConnection").field(e).finish(), Self::NoConnection(e) => f.debug_tuple("NoConnection").field(e).finish(),
Self::AlreadyExists(e) => f.debug_tuple("AlreadyExists").field(e).finish(), Self::AlreadyExists(e) => f.debug_tuple("AlreadyExists").field(e).finish(),
Self::InvalidMessage(s) => f.debug_tuple("InvalidMessage").field(s).finish(), Self::InvalidMessage(s) => f.debug_tuple("InvalidMessage").field(s).finish(),
@ -257,7 +257,7 @@ impl<T> Display for NetworkResult<T> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self { match self {
Self::Timeout => write!(f, "Timeout"), Self::Timeout => write!(f, "Timeout"),
Self::ServiceUnavailable => write!(f, "ServiceUnavailable"), Self::ServiceUnavailable(s) => write!(f, "ServiceUnavailable({})", s),
Self::NoConnection(e) => write!(f, "NoConnection({})", e.kind()), Self::NoConnection(e) => write!(f, "NoConnection({})", e.kind()),
Self::AlreadyExists(e) => write!(f, "AlreadyExists({})", e.kind()), Self::AlreadyExists(e) => write!(f, "AlreadyExists({})", e.kind()),
Self::InvalidMessage(s) => write!(f, "InvalidMessage({})", s), Self::InvalidMessage(s) => write!(f, "InvalidMessage({})", s),
@ -276,7 +276,7 @@ macro_rules! network_result_try {
($r: expr) => { ($r: expr) => {
match $r { match $r {
NetworkResult::Timeout => return Ok(NetworkResult::Timeout), NetworkResult::Timeout => return Ok(NetworkResult::Timeout),
NetworkResult::ServiceUnavailable => return Ok(NetworkResult::ServiceUnavailable), NetworkResult::ServiceUnavailable(s) => return Ok(NetworkResult::ServiceUnavailable(s)),
NetworkResult::NoConnection(e) => return Ok(NetworkResult::NoConnection(e)), NetworkResult::NoConnection(e) => return Ok(NetworkResult::NoConnection(e)),
NetworkResult::AlreadyExists(e) => return Ok(NetworkResult::AlreadyExists(e)), NetworkResult::AlreadyExists(e) => return Ok(NetworkResult::AlreadyExists(e)),
NetworkResult::InvalidMessage(s) => return Ok(NetworkResult::InvalidMessage(s)), NetworkResult::InvalidMessage(s) => return Ok(NetworkResult::InvalidMessage(s)),
@ -289,9 +289,9 @@ macro_rules! network_result_try {
$f; $f;
return Ok(NetworkResult::Timeout); return Ok(NetworkResult::Timeout);
} }
NetworkResult::ServiceUnavailable => { NetworkResult::ServiceUnavailable(s) => {
$f; $f;
return Ok(NetworkResult::ServiceUnavailable); return Ok(NetworkResult::ServiceUnavailable(s));
} }
NetworkResult::NoConnection(e) => { NetworkResult::NoConnection(e) => {
$f; $f;
@ -313,22 +313,22 @@ macro_rules! network_result_try {
#[macro_export] #[macro_export]
macro_rules! log_network_result { macro_rules! log_network_result {
($text:expr) => { ($text:expr) => {
cfg_if::cfg_if! { // cfg_if::cfg_if! {
if #[cfg(debug_assertions)] { // if #[cfg(debug_assertions)] {
info!(target: "network_result", "{}", $text) // info!(target: "network_result", "{}", $text)
} else { // } else {
debug!(target: "network_result", "{}", $text) debug!(target: "network_result", "{}", $text)
} // }
} // }
}; };
($fmt:literal, $($arg:expr),+) => { ($fmt:literal, $($arg:expr),+) => {
cfg_if::cfg_if! { // cfg_if::cfg_if! {
if #[cfg(debug_assertions)] { // if #[cfg(debug_assertions)] {
info!(target: "network_result", $fmt, $($arg),+); // info!(target: "network_result", $fmt, $($arg),+);
} else { // } else {
debug!(target: "network_result", $fmt, $($arg),+); debug!(target: "network_result", $fmt, $($arg),+);
} // }
} // }
}; };
} }
@ -337,49 +337,61 @@ macro_rules! network_result_value_or_log {
($r: expr => $f:tt) => { ($r: expr => $f:tt) => {
match $r { match $r {
NetworkResult::Timeout => { NetworkResult::Timeout => {
log_network_result!("{} at {}@{}:{}", "Timeout", file!(), line!(), column!());
$f
}
NetworkResult::ServiceUnavailable => {
log_network_result!( log_network_result!(
"{} at {}@{}:{}", "{} at {}@{}:{} in {}",
"ServiceUnavailable", "Timeout",
file!(), file!(),
line!(), line!(),
column!() column!(),
fn_name::uninstantiated!()
);
$f
}
NetworkResult::ServiceUnavailable(s) => {
log_network_result!(
"{}({}) at {}@{}:{} in {}",
"ServiceUnavailable",
s,
file!(),
line!(),
column!(),
fn_name::uninstantiated!()
); );
$f $f
} }
NetworkResult::NoConnection(e) => { NetworkResult::NoConnection(e) => {
log_network_result!( log_network_result!(
"{}({}) at {}@{}:{}", "{}({}) at {}@{}:{} in {}",
"No connection", "No connection",
e.to_string(), e.to_string(),
file!(), file!(),
line!(), line!(),
column!() column!(),
fn_name::uninstantiated!()
); );
$f $f
} }
NetworkResult::AlreadyExists(e) => { NetworkResult::AlreadyExists(e) => {
log_network_result!( log_network_result!(
"{}({}) at {}@{}:{}", "{}({}) at {}@{}:{} in {}",
"Already exists", "Already exists",
e.to_string(), e.to_string(),
file!(), file!(),
line!(), line!(),
column!() column!(),
fn_name::uninstantiated!()
); );
$f $f
} }
NetworkResult::InvalidMessage(s) => { NetworkResult::InvalidMessage(s) => {
log_network_result!( log_network_result!(
"{}({}) at {}@{}:{}", "{}({}) at {}@{}:{} in {}",
"Invalid message", "Invalid message",
s, s,
file!(), file!(),
line!(), line!(),
column!() column!(),
fn_name::uninstantiated!()
); );
$f $f
} }