Merge branch 'dht-testing' into 'main'

More bug fixes for network and logging and ui

See merge request veilid/veilid!34
This commit is contained in:
John Smith 2023-06-21 20:01:12 +00:00
commit 14d184e151
16 changed files with 145 additions and 81 deletions

11
Cargo.lock generated
View File

@ -1414,9 +1414,9 @@ dependencies = [
[[package]]
name = "crossterm"
version = "0.26.1"
version = "0.25.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a84cda67535339806297f1b331d6dd6320470d2a0fe65381e79ee9e156dd3d13"
checksum = "e64e6c0fbe2c17357405f7c758c1ef960fce08bdfb2c03d88d2a18d7e09c4b67"
dependencies = [
"bitflags 1.3.2",
"crossterm_winapi",
@ -2071,6 +2071,12 @@ dependencies = [
"spin 0.9.8",
]
[[package]]
name = "fn_name"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "528a0eb35b41b895aef1afed5ab28659084118e730edc72f033e76fb71666dbb"
[[package]]
name = "fnv"
version = "1.0.7"
@ -6591,6 +6597,7 @@ dependencies = [
"cfg-if 1.0.0",
"console_error_panic_hook",
"eyre",
"fn_name",
"futures-util",
"jni 0.21.1",
"jni-sys",

2
external/cursive vendored

@ -1 +1 @@
Subproject commit 7df15342b1f08b6b7a5cdc7aa8f97342d2e4ad7b
Subproject commit 631dad18f0b4b246914998e8952da8abb6093bd2

View File

@ -107,14 +107,21 @@ impl CommandProcessor {
self.ui_sender().add_node_event(
Level::Info,
r#"Commands:
exit/quit - exit the client
disconnect - disconnect the client from the Veilid node
shutdown - shut the server down
attach - attach the server to the Veilid network
detach - detach the server from the Veilid network
debug - send a debugging command to the Veilid server
change_log_level - change the log level for a tracing layer
reply - reply to an AppCall not handled directly by the server
exit/quit exit the client
disconnect disconnect the client from the Veilid node
shutdown shut the server down
attach attach the server to the Veilid network
detach detach the server from the Veilid network
debug [command] send a debugging command to the Veilid server
change_log_level <layer> <level> change the log level for a tracing layer
layers include:
all, terminal, system, api, file, otlp
levels include:
error, warn, info, debug, trace
reply <call id> <message> reply to an AppCall not handled directly by the server
<call id> must be exact call id reported in VeilidUpdate
<message> can be a string (left trimmed) or
it can start with a '#' followed by a string of undelimited hex bytes
"#
.to_owned(),
);
@ -188,7 +195,7 @@ reply - reply to an AppCall not handled directly by the server
spawn_detached_local(async move {
match capi.server_debug(rest.unwrap_or_default()).await {
Ok(output) => {
ui.add_node_event(Level::Debug, output);
ui.add_node_event(Level::Info, output);
ui.send_callback(callback);
}
Err(e) => {

View File

@ -8,6 +8,7 @@ use cursive::event::*;
use cursive::theme::*;
use cursive::traits::*;
use cursive::utils::markup::StyledString;
use cursive::view::SizeConstraint;
use cursive::views::*;
use cursive::Cursive;
use cursive::CursiveRunnable;
@ -504,6 +505,16 @@ impl UI {
}
}
fn on_focus_peers_table_view<T>(ptv: &mut ResizedView<T>) -> EventResult {
ptv.set_height(SizeConstraint::Full);
EventResult::Ignored
}
fn on_focus_lost_peers_table_view<T>(ptv: &mut ResizedView<T>) -> EventResult {
ptv.set_height(SizeConstraint::AtLeast(8));
EventResult::Ignored
}
fn show_connection_dialog(s: &mut Cursive, state: ConnectionState) -> bool {
let mut inner = Self::inner_mut(s);
@ -829,10 +840,13 @@ impl UI {
.column(PeerTableColumn::TransferDownAvg, "Down", |c| c.width(8))
.column(PeerTableColumn::TransferUpAvg, "Up", |c| c.width(8));
peers_table_view.set_on_submit(UI::on_submit_peers_table_view);
let peers_table_view = peers_table_view
.with_name("peers")
.full_width()
.min_height(8);
let peers_table_view = FocusTracker::new(ResizedView::new(
SizeConstraint::Full,
SizeConstraint::AtLeast(8),
peers_table_view.with_name("peers"),
))
.on_focus(UI::on_focus_peers_table_view)
.on_focus_lost(UI::on_focus_lost_peers_table_view);
// attempt at using Mux. Mux has bugs, like resizing problems.
// let mut mux = Mux::new();

View File

@ -196,6 +196,7 @@ impl AttachmentManager {
if let Err(err) = netman.startup().await {
error!("network startup failed: {}", err);
netman.shutdown().await;
restart = true;
break;
}

View File

@ -906,7 +906,7 @@ impl NetworkManager {
return Ok(NetworkResult::timeout());
}
ReceiptEvent::Cancelled => {
bail!("reverse connect receipt cancelled from {:?}", target_nr);
return Ok(NetworkResult::no_connection_other(format!("reverse connect receipt cancelled from {}", target_nr)))
}
};
@ -1011,7 +1011,7 @@ impl NetworkManager {
return Ok(NetworkResult::timeout());
}
ReceiptEvent::Cancelled => {
bail!("hole punch receipt cancelled from {}", target_nr);
return Ok(NetworkResult::no_connection_other(format!("hole punch receipt cancelled from {}", target_nr)))
}
};

View File

@ -115,6 +115,15 @@ impl Network {
// tcp_stream.peer_addr().unwrap(),
// );
if let Err(e) = tcp_stream.set_linger(Some(core::time::Duration::from_secs(0))) {
log_net!(debug "Couldn't set TCP linger: {}", e);
return;
}
if let Err(e) = tcp_stream.set_nodelay(true) {
log_net!(debug "Couldn't set TCP nodelay: {}", e);
return;
}
let listener_state = listener_state.clone();
let connection_manager = connection_manager.clone();

View File

@ -33,7 +33,7 @@ cfg_if! {
}
}
#[instrument(level = "trace", ret, err)]
#[instrument(level = "trace", ret)]
pub fn new_unbound_shared_udp_socket(domain: Domain) -> io::Result<Socket> {
let socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
if domain == Domain::IPV6 {
@ -49,7 +49,7 @@ pub fn new_unbound_shared_udp_socket(domain: Domain) -> io::Result<Socket> {
Ok(socket)
}
#[instrument(level = "trace", ret, err)]
#[instrument(level = "trace", ret)]
pub fn new_bound_shared_udp_socket(local_address: SocketAddr) -> io::Result<Socket> {
let domain = Domain::for_address(local_address);
let socket = new_unbound_shared_udp_socket(domain)?;
@ -61,7 +61,7 @@ pub fn new_bound_shared_udp_socket(local_address: SocketAddr) -> io::Result<Sock
Ok(socket)
}
#[instrument(level = "trace", ret, err)]
#[instrument(level = "trace", ret)]
pub fn new_bound_first_udp_socket(local_address: SocketAddr) -> io::Result<Socket> {
let domain = Domain::for_address(local_address);
let socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
@ -95,7 +95,7 @@ pub fn new_bound_first_udp_socket(local_address: SocketAddr) -> io::Result<Socke
Ok(socket)
}
#[instrument(level = "trace", ret, err)]
#[instrument(level = "trace", ret)]
pub fn new_unbound_shared_tcp_socket(domain: Domain) -> io::Result<Socket> {
let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))?;
if let Err(e) = socket.set_linger(Some(core::time::Duration::from_secs(0))) {
@ -117,7 +117,7 @@ pub fn new_unbound_shared_tcp_socket(domain: Domain) -> io::Result<Socket> {
Ok(socket)
}
#[instrument(level = "trace", ret, err)]
#[instrument(level = "trace", ret)]
pub fn new_bound_shared_tcp_socket(local_address: SocketAddr) -> io::Result<Socket> {
let domain = Domain::for_address(local_address);
let socket = new_unbound_shared_tcp_socket(domain)?;
@ -129,7 +129,7 @@ pub fn new_bound_shared_tcp_socket(local_address: SocketAddr) -> io::Result<Sock
Ok(socket)
}
#[instrument(level = "trace", ret, err)]
#[instrument(level = "trace", ret)]
pub fn new_bound_first_tcp_socket(local_address: SocketAddr) -> io::Result<Socket> {
let domain = Domain::for_address(local_address);

View File

@ -243,7 +243,24 @@ impl Network {
bail!("No valid listen address: {}", listen_address);
}
let port = sockaddrs[0].port();
if !self.bind_first_tcp_port(port) {
let mut attempts = 10;
let mut success = false;
while attempts >= 0 {
if self.bind_first_tcp_port(port) {
success = true;
break;
}
attempts -= 1;
// Wait 5 seconds before trying again
log_net!(debug
"Binding TCP port at {} failed, waiting. Attempts remaining = {}",
port, attempts
);
sleep(5000).await
}
if !success {
bail!("Could not find free tcp port to listen on");
}
Ok((port, sockaddrs.iter().map(|s| s.ip()).collect()))

View File

@ -179,7 +179,7 @@ impl NetworkConnection {
}
}
#[instrument(level="trace", skip(message, stats), fields(message.len = message.len()), ret, err)]
#[instrument(level="trace", skip(message, stats), fields(message.len = message.len()), ret)]
async fn send_internal(
protocol_connection: &ProtocolNetworkConnection,
stats: Arc<Mutex<NetworkConnectionStats>>,
@ -194,7 +194,7 @@ impl NetworkConnection {
Ok(NetworkResult::Value(out))
}
#[instrument(level="trace", skip(stats), fields(ret.len), err)]
#[instrument(level="trace", skip(stats), fields(ret.len))]
async fn recv_internal(
protocol_connection: &ProtocolNetworkConnection,
stats: Arc<Mutex<NetworkConnectionStats>>,

View File

@ -5,7 +5,7 @@ impl RoutingTable {
pub fn find_all_closest_peers(&self, key: TypedKey) -> NetworkResult<Vec<PeerInfo>> {
let Some(own_peer_info) = self.get_own_peer_info(RoutingDomain::PublicInternet) else {
// Our own node info is not yet available, drop this request.
return NetworkResult::service_unavailable();
return NetworkResult::service_unavailable("Not finding closest peers because our peer info is not yet available");
};
// find N nodes closest to the target node in our routing table

View File

@ -591,20 +591,8 @@ impl VeilidAPI {
.map_err(VeilidAPIError::internal)?
{
NetworkResult::Value(v) => v,
NetworkResult::Timeout => {
return Ok("Timeout".to_owned());
}
NetworkResult::ServiceUnavailable => {
return Ok("ServiceUnavailable".to_owned());
}
NetworkResult::NoConnection(e) => {
return Ok(format!("NoConnection({})", e));
}
NetworkResult::AlreadyExists(e) => {
return Ok(format!("AlreadyExists({})", e));
}
NetworkResult::InvalidMessage(e) => {
return Ok(format!("InvalidMessage({})", e));
r => {
return Ok(r.to_string());
}
};

View File

@ -148,7 +148,10 @@ impl RoutingContext {
let answer = match rpc_processor.rpc_call_app_call(dest, request).await {
Ok(NetworkResult::Value(v)) => v,
Ok(NetworkResult::Timeout) => apibail_timeout!(),
Ok(NetworkResult::ServiceUnavailable) => apibail_try_again!(),
Ok(NetworkResult::ServiceUnavailable(e)) => {
log_network_result!(format!("app_call: ServiceUnavailable({})", e));
apibail_try_again!()
}
Ok(NetworkResult::NoConnection(e)) | Ok(NetworkResult::AlreadyExists(e)) => {
apibail_no_connection!(e);
}
@ -173,7 +176,10 @@ impl RoutingContext {
match rpc_processor.rpc_call_app_message(dest, message).await {
Ok(NetworkResult::Value(())) => {}
Ok(NetworkResult::Timeout) => apibail_timeout!(),
Ok(NetworkResult::ServiceUnavailable) => apibail_try_again!(),
Ok(NetworkResult::ServiceUnavailable(e)) => {
log_network_result!(format!("app_message: ServiceUnavailable({})", e));
apibail_try_again!()
}
Ok(NetworkResult::NoConnection(e)) | Ok(NetworkResult::AlreadyExists(e)) => {
apibail_no_connection!(e);
}

View File

@ -35,6 +35,7 @@ stop-token = { version = "^0", default-features = false }
rand = "^0.7"
rust-fsm = "^0"
backtrace = "^0"
fn_name = "^0"
# Dependencies for native builds only
# Linux, Windows, Mac, iOS, Android

View File

@ -134,6 +134,8 @@ use parking_lot::*;
use stop_token::*;
use thiserror::Error as ThisError;
pub use fn_name;
// For iOS tests
#[no_mangle]
pub extern "C" fn main_rs() {}

View File

@ -68,7 +68,7 @@ impl<T, E> NetworkResultResultExt<T, E> for NetworkResult<Result<T, E>> {
fn into_result_network_result(self) -> Result<NetworkResult<T>, E> {
match self {
NetworkResult::Timeout => Ok(NetworkResult::<T>::Timeout),
NetworkResult::ServiceUnavailable => Ok(NetworkResult::<T>::ServiceUnavailable),
NetworkResult::ServiceUnavailable(s) => Ok(NetworkResult::<T>::ServiceUnavailable(s)),
NetworkResult::NoConnection(e) => Ok(NetworkResult::<T>::NoConnection(e)),
NetworkResult::AlreadyExists(e) => Ok(NetworkResult::<T>::AlreadyExists(e)),
NetworkResult::InvalidMessage(s) => Ok(NetworkResult::<T>::InvalidMessage(s)),
@ -161,7 +161,7 @@ impl<T> FoldedNetworkResultExt<T> for io::Result<NetworkResult<T>> {
#[must_use]
pub enum NetworkResult<T> {
Timeout,
ServiceUnavailable,
ServiceUnavailable(String),
NoConnection(io::Error),
AlreadyExists(io::Error),
InvalidMessage(String),
@ -172,8 +172,8 @@ impl<T> NetworkResult<T> {
pub fn timeout() -> Self {
Self::Timeout
}
pub fn service_unavailable() -> Self {
Self::ServiceUnavailable
pub fn service_unavailable<S: ToString>(s: S) -> Self {
Self::ServiceUnavailable(s.to_string())
}
pub fn no_connection(e: io::Error) -> Self {
Self::NoConnection(e)
@ -206,7 +206,7 @@ impl<T> NetworkResult<T> {
pub fn map<X, F: Fn(T) -> X>(self, f: F) -> NetworkResult<X> {
match self {
Self::Timeout => NetworkResult::<X>::Timeout,
Self::ServiceUnavailable => NetworkResult::<X>::ServiceUnavailable,
Self::ServiceUnavailable(s) => NetworkResult::<X>::ServiceUnavailable(s),
Self::NoConnection(e) => NetworkResult::<X>::NoConnection(e),
Self::AlreadyExists(e) => NetworkResult::<X>::AlreadyExists(e),
Self::InvalidMessage(s) => NetworkResult::<X>::InvalidMessage(s),
@ -216,9 +216,9 @@ impl<T> NetworkResult<T> {
pub fn into_result(self) -> Result<T, io::Error> {
match self {
Self::Timeout => Err(io::Error::new(io::ErrorKind::TimedOut, "Timed out")),
Self::ServiceUnavailable => Err(io::Error::new(
Self::ServiceUnavailable(s) => Err(io::Error::new(
io::ErrorKind::NotFound,
"Service unavailable",
format!("Service unavailable: {}", s),
)),
Self::NoConnection(e) => Err(e),
Self::AlreadyExists(e) => Err(e),
@ -244,7 +244,7 @@ impl<T: Debug> Debug for NetworkResult<T> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
Self::Timeout => write!(f, "Timeout"),
Self::ServiceUnavailable => write!(f, "ServiceUnavailable"),
Self::ServiceUnavailable(s) => f.debug_tuple("ServiceUnavailable").field(s).finish(),
Self::NoConnection(e) => f.debug_tuple("NoConnection").field(e).finish(),
Self::AlreadyExists(e) => f.debug_tuple("AlreadyExists").field(e).finish(),
Self::InvalidMessage(s) => f.debug_tuple("InvalidMessage").field(s).finish(),
@ -257,7 +257,7 @@ impl<T> Display for NetworkResult<T> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
Self::Timeout => write!(f, "Timeout"),
Self::ServiceUnavailable => write!(f, "ServiceUnavailable"),
Self::ServiceUnavailable(s) => write!(f, "ServiceUnavailable({})", s),
Self::NoConnection(e) => write!(f, "NoConnection({})", e.kind()),
Self::AlreadyExists(e) => write!(f, "AlreadyExists({})", e.kind()),
Self::InvalidMessage(s) => write!(f, "InvalidMessage({})", s),
@ -276,7 +276,7 @@ macro_rules! network_result_try {
($r: expr) => {
match $r {
NetworkResult::Timeout => return Ok(NetworkResult::Timeout),
NetworkResult::ServiceUnavailable => return Ok(NetworkResult::ServiceUnavailable),
NetworkResult::ServiceUnavailable(s) => return Ok(NetworkResult::ServiceUnavailable(s)),
NetworkResult::NoConnection(e) => return Ok(NetworkResult::NoConnection(e)),
NetworkResult::AlreadyExists(e) => return Ok(NetworkResult::AlreadyExists(e)),
NetworkResult::InvalidMessage(s) => return Ok(NetworkResult::InvalidMessage(s)),
@ -289,9 +289,9 @@ macro_rules! network_result_try {
$f;
return Ok(NetworkResult::Timeout);
}
NetworkResult::ServiceUnavailable => {
NetworkResult::ServiceUnavailable(s) => {
$f;
return Ok(NetworkResult::ServiceUnavailable);
return Ok(NetworkResult::ServiceUnavailable(s));
}
NetworkResult::NoConnection(e) => {
$f;
@ -313,22 +313,22 @@ macro_rules! network_result_try {
#[macro_export]
macro_rules! log_network_result {
($text:expr) => {
cfg_if::cfg_if! {
if #[cfg(debug_assertions)] {
info!(target: "network_result", "{}", $text)
} else {
// cfg_if::cfg_if! {
// if #[cfg(debug_assertions)] {
// info!(target: "network_result", "{}", $text)
// } else {
debug!(target: "network_result", "{}", $text)
}
}
// }
// }
};
($fmt:literal, $($arg:expr),+) => {
cfg_if::cfg_if! {
if #[cfg(debug_assertions)] {
info!(target: "network_result", $fmt, $($arg),+);
} else {
// cfg_if::cfg_if! {
// if #[cfg(debug_assertions)] {
// info!(target: "network_result", $fmt, $($arg),+);
// } else {
debug!(target: "network_result", $fmt, $($arg),+);
}
}
// }
// }
};
}
@ -337,49 +337,61 @@ macro_rules! network_result_value_or_log {
($r: expr => $f:tt) => {
match $r {
NetworkResult::Timeout => {
log_network_result!("{} at {}@{}:{}", "Timeout", file!(), line!(), column!());
$f
}
NetworkResult::ServiceUnavailable => {
log_network_result!(
"{} at {}@{}:{}",
"ServiceUnavailable",
"{} at {}@{}:{} in {}",
"Timeout",
file!(),
line!(),
column!()
column!(),
fn_name::uninstantiated!()
);
$f
}
NetworkResult::ServiceUnavailable(s) => {
log_network_result!(
"{}({}) at {}@{}:{} in {}",
"ServiceUnavailable",
s,
file!(),
line!(),
column!(),
fn_name::uninstantiated!()
);
$f
}
NetworkResult::NoConnection(e) => {
log_network_result!(
"{}({}) at {}@{}:{}",
"{}({}) at {}@{}:{} in {}",
"No connection",
e.to_string(),
file!(),
line!(),
column!()
column!(),
fn_name::uninstantiated!()
);
$f
}
NetworkResult::AlreadyExists(e) => {
log_network_result!(
"{}({}) at {}@{}:{}",
"{}({}) at {}@{}:{} in {}",
"Already exists",
e.to_string(),
file!(),
line!(),
column!()
column!(),
fn_name::uninstantiated!()
);
$f
}
NetworkResult::InvalidMessage(s) => {
log_network_result!(
"{}({}) at {}@{}:{}",
"{}({}) at {}@{}:{} in {}",
"Invalid message",
s,
file!(),
line!(),
column!()
column!(),
fn_name::uninstantiated!()
);
$f
}