more log refactor

This commit is contained in:
John Smith 2021-12-17 19:18:25 -05:00
parent 46f8607998
commit 971fa94751
17 changed files with 283 additions and 237 deletions

View File

@ -663,14 +663,15 @@ impl Network {
let peer_socket_addr = descriptor let peer_socket_addr = descriptor
.remote .remote
.to_socket_addr() .to_socket_addr()
.map_err(|_| "unable to get socket address".to_owned())?; .map_err(map_to_string)
.map_err(logthru_net!(error))?;
if let Some(ph) = if let Some(ph) =
self.find_best_udp_protocol_handler(&peer_socket_addr, &descriptor.local) self.find_best_udp_protocol_handler(&peer_socket_addr, &descriptor.local)
{ {
ph.clone() ph.clone()
.send_message(data, peer_socket_addr) .send_message(data, peer_socket_addr)
.await .await
.map_err(|_| "unable to send udp message".to_owned())?; .map_err(logthru_net!())?;
// Data was consumed // Data was consumed
return Ok(None); return Ok(None);
} }
@ -683,11 +684,7 @@ impl Network {
.get_connection(descriptor) .get_connection(descriptor)
{ {
// connection exists, send over it // connection exists, send over it
entry entry.conn.send(data).await.map_err(logthru_net!())?;
.conn
.send(data)
.await
.map_err(|_| "failed to send tcp or ws message".to_owned())?;
// Data was consumed // Data was consumed
return Ok(None); return Ok(None);
@ -706,24 +703,22 @@ impl Network {
) -> Result<(), String> { ) -> Result<(), String> {
match &dial_info { match &dial_info {
DialInfo::UDP(_) => { DialInfo::UDP(_) => {
let peer_socket_addr = dial_info let peer_socket_addr = dial_info.to_socket_addr().map_err(logthru_net!())?;
.to_socket_addr()
.map_err(|_| "failed to resolve dial info for UDP dial info".to_owned())?;
RawUdpProtocolHandler::send_unbound_message(data, peer_socket_addr) RawUdpProtocolHandler::send_unbound_message(data, peer_socket_addr)
.await .await
.map_err(|_| "failed to send unbound message to UDP dial info".to_owned()) .map_err(logthru_net!())
} }
DialInfo::TCP(_) => { DialInfo::TCP(_) => {
let peer_socket_addr = dial_info let peer_socket_addr = dial_info.to_socket_addr().map_err(logthru_net!())?;
.to_socket_addr()
.map_err(|_| "failed to resolve dial info for TCP dial info".to_owned())?;
RawTcpProtocolHandler::send_unbound_message(data, peer_socket_addr) RawTcpProtocolHandler::send_unbound_message(data, peer_socket_addr)
.await .await
.map_err(|_| "failed to connect to TCP dial info".to_owned()) .map_err(logthru_net!())
} }
DialInfo::WS(_) => Err("WS protocol does not support unbound messages".to_owned()), DialInfo::WS(_) => Err("WS protocol does not support unbound messages".to_owned())
DialInfo::WSS(_) => Err("WSS protocol does not support unbound messages".to_owned()), .map_err(logthru_net!(error)),
DialInfo::WSS(_) => Err("WSS protocol does not support unbound messages".to_owned())
.map_err(logthru_net!(error)),
} }
} }
@ -736,38 +731,33 @@ impl Network {
let conn = match &dial_info { let conn = match &dial_info {
DialInfo::UDP(_) => { DialInfo::UDP(_) => {
let peer_socket_addr = dial_info let peer_socket_addr = dial_info.to_socket_addr().map_err(logthru_net!())?;
.to_socket_addr()
.map_err(|_| "failed to resolve dial info for UDP dial info".to_owned())?;
if let Some(ph) = self.find_best_udp_protocol_handler(&peer_socket_addr, &None) { if let Some(ph) = self.find_best_udp_protocol_handler(&peer_socket_addr, &None) {
return ph return ph
.send_message(data, peer_socket_addr) .send_message(data, peer_socket_addr)
.await .await
.map_err(|_| "failed to send message to UDP dial info".to_owned()); .map_err(logthru_net!());
} else { } else {
return Err("no appropriate UDP protocol handler for dial_info".to_owned()); return Err("no appropriate UDP protocol handler for dial_info".to_owned())
.map_err(logthru_net!(error));
} }
} }
DialInfo::TCP(_) => { DialInfo::TCP(_) => {
let peer_socket_addr = dial_info let peer_socket_addr = dial_info.to_socket_addr().map_err(logthru_net!())?;
.to_socket_addr()
.map_err(|_| "failed to resolve dial info for TCP dial info".to_owned())?;
let some_local_addr = self.find_best_tcp_local_address(&peer_socket_addr); let some_local_addr = self.find_best_tcp_local_address(&peer_socket_addr);
RawTcpProtocolHandler::connect(network_manager, some_local_addr, peer_socket_addr) RawTcpProtocolHandler::connect(network_manager, some_local_addr, peer_socket_addr)
.await .await
.map_err(|_| "failed to connect to TCP dial info".to_owned())? .map_err(logthru_net!())?
} }
DialInfo::WS(_) => WebsocketProtocolHandler::connect(network_manager, dial_info) DialInfo::WS(_) => WebsocketProtocolHandler::connect(network_manager, dial_info)
.await .await
.map_err(|_| "failed to connect to WS dial info".to_owned())?, .map_err(logthru_net!(error))?,
DialInfo::WSS(_) => WebsocketProtocolHandler::connect(network_manager, dial_info) DialInfo::WSS(_) => WebsocketProtocolHandler::connect(network_manager, dial_info)
.await .await
.map_err(|_| "failed to connect to WSS dial info".to_owned())?, .map_err(logthru_net!(error))?,
}; };
conn.send(data) conn.send(data).await.map_err(logthru_net!(error))
.await
.map_err(|_| "failed to send data to dial info".to_owned())
} }
pub async fn send_data(&self, node_ref: NodeRef, data: Vec<u8>) -> Result<(), String> { pub async fn send_data(&self, node_ref: NodeRef, data: Vec<u8>) -> Result<(), String> {

View File

@ -210,7 +210,7 @@ impl RawTcpProtocolHandler {
socket socket
.connect(&remote_socket2_addr) .connect(&remote_socket2_addr)
.map_err(map_to_string) .map_err(map_to_string)
.map_err(logthru_net!("addr={}", remote_socket_addr))?; .map_err(logthru_net!(error "addr={}", remote_socket_addr))?;
let std_stream: std::net::TcpStream = socket.into(); let std_stream: std::net::TcpStream = socket.into();
let ts = TcpStream::from(std_stream); let ts = TcpStream::from(std_stream);

View File

@ -63,10 +63,10 @@ impl RawUdpProtocolHandler {
pub async fn send_message(&self, data: Vec<u8>, socket_addr: SocketAddr) -> Result<(), String> { pub async fn send_message(&self, data: Vec<u8>, socket_addr: SocketAddr) -> Result<(), String> {
if data.len() > MAX_MESSAGE_SIZE { if data.len() > MAX_MESSAGE_SIZE {
return Err("sending too large UDP message".to_owned()); return Err("sending too large UDP message".to_owned()).map_err(logthru_net!(error));
} }
trace!( log_net!(
"sending UDP message of length {} to {}", "sending UDP message of length {} to {}",
data.len(), data.len(),
socket_addr socket_addr
@ -76,9 +76,11 @@ impl RawUdpProtocolHandler {
let len = socket let len = socket
.send_to(&data, socket_addr) .send_to(&data, socket_addr)
.await .await
.map_err(|e| format!("{}", e))?; .map_err(map_to_string)
.map_err(logthru_net!(error "failed udp send: addr={}", socket_addr))?;
if len != data.len() { if len != data.len() {
Err("UDP partial send".to_owned()) Err("UDP partial send".to_owned()).map_err(logthru_net!(error))
} else { } else {
Ok(()) Ok(())
} }
@ -89,10 +91,10 @@ impl RawUdpProtocolHandler {
socket_addr: SocketAddr, socket_addr: SocketAddr,
) -> Result<(), String> { ) -> Result<(), String> {
if data.len() > MAX_MESSAGE_SIZE { if data.len() > MAX_MESSAGE_SIZE {
return Err("sending too large unbound UDP message".to_owned()); return Err("sending too large unbound UDP message".to_owned())
.map_err(logthru_net!(error));
} }
xxx continue here log_net!(
trace!(
"sending unbound message of length {} to {}", "sending unbound message of length {} to {}",
data.len(), data.len(),
socket_addr socket_addr
@ -107,13 +109,15 @@ impl RawUdpProtocolHandler {
}; };
let socket = UdpSocket::bind(local_socket_addr) let socket = UdpSocket::bind(local_socket_addr)
.await .await
.map_err(|e| format!("{}", e))?; .map_err(map_to_string)
.map_err(logthru_net!(error "failed to bind unbound udp socket"))?;
let len = socket let len = socket
.send_to(&data, socket_addr) .send_to(&data, socket_addr)
.await .await
.map_err(|e| format!("{}", e))?; .map_err(map_to_string)
.map_err(logthru_net!(error "failed unbound udp send: addr={}", socket_addr))?;
if len != data.len() { if len != data.len() {
Err("UDP partial unbound send".to_owned()) Err("UDP partial unbound send".to_owned()).map_err(logthru_net!(error))
} else { } else {
Ok(()) Ok(())
} }

View File

@ -101,6 +101,7 @@ where
.send(Message::binary(message)) .send(Message::binary(message))
.await .await
.map_err(map_to_string) .map_err(map_to_string)
.map_err(logthru_net!(error "failed to send websocket message"))
}) })
} }
pub fn recv(&self) -> SystemPinBoxFuture<Result<Vec<u8>, String>> { pub fn recv(&self) -> SystemPinBoxFuture<Result<Vec<u8>, String>> {
@ -112,17 +113,18 @@ where
let out = match inner.ws_stream.next().await { let out = match inner.ws_stream.next().await {
Some(Ok(Message::Binary(v))) => v, Some(Ok(Message::Binary(v))) => v,
Some(Ok(_)) => { Some(Ok(_)) => {
return Err("Unexpected WS message type".to_owned()); return Err("Unexpected WS message type".to_owned())
.map_err(logthru_net!(error));
} }
Some(Err(e)) => { Some(Err(e)) => {
return Err(e.to_string()); return Err(e.to_string()).map_err(logthru_net!(error));
} }
None => { None => {
return Err("WS stream closed".to_owned()); return Err("WS stream closed".to_owned()).map_err(logthru_net!());
} }
}; };
if out.len() > MAX_MESSAGE_SIZE { if out.len() > MAX_MESSAGE_SIZE {
Err("sending too large WS message".to_owned()) Err("sending too large WS message".to_owned()).map_err(logthru_net!(error))
} else { } else {
Ok(out) Ok(out)
} }
@ -189,7 +191,10 @@ impl WebsocketProtocolHandler {
{ {
Ok(_) => (), Ok(_) => (),
Err(e) => { Err(e) => {
return Err(format!("failed to peek stream: {:?}", e)); if e.kind() == io::ErrorKind::TimedOut {
return Err(e).map_err(map_to_string).map_err(logthru_net!());
}
return Err(e).map_err(map_to_string).map_err(logthru_net!(error));
} }
} }
// Check for websocket path // Check for websocket path
@ -261,13 +266,16 @@ impl WebsocketProtocolHandler {
let tcp_stream = TcpStream::connect(format!("{}:{}", &domain, &port)) let tcp_stream = TcpStream::connect(format!("{}:{}", &domain, &port))
.await .await
.map_err(|e| format!("failed to connect tcp stream: {}", e))?; .map_err(map_to_string)
.map_err(logthru_net!(error))?;
let local_addr = tcp_stream let local_addr = tcp_stream
.local_addr() .local_addr()
.map_err(|e| format!("can't get local address for tcp stream: {}", e))?; .map_err(map_to_string)
.map_err(logthru_net!(error))?;
let peer_socket_addr = tcp_stream let peer_socket_addr = tcp_stream
.peer_addr() .peer_addr()
.map_err(|e| format!("can't get peer address for tcp stream: {}", e))?; .map_err(map_to_string)
.map_err(logthru_net!(error))?;
let peer_addr = PeerAddress::new( let peer_addr = PeerAddress::new(
Address::from_socket_addr(peer_socket_addr), Address::from_socket_addr(peer_socket_addr),
peer_socket_addr.port(), peer_socket_addr.port(),
@ -279,10 +287,12 @@ impl WebsocketProtocolHandler {
let tls_stream = connector let tls_stream = connector
.connect(domain, tcp_stream) .connect(domain, tcp_stream)
.await .await
.map_err(|e| format!("can't connect tls: {}", e))?; .map_err(map_to_string)
.map_err(logthru_net!(error))?;
let (ws_stream, _response) = client_async(request, tls_stream) let (ws_stream, _response) = client_async(request, tls_stream)
.await .await
.map_err(|e| format!("wss negotation failed: {}", e))?; .map_err(map_to_string)
.map_err(logthru_net!(error))?;
let conn = NetworkConnection::Wss(WebsocketNetworkConnection::new(tls, ws_stream)); let conn = NetworkConnection::Wss(WebsocketNetworkConnection::new(tls, ws_stream));
network_manager network_manager
.on_new_connection( .on_new_connection(
@ -294,7 +304,8 @@ impl WebsocketProtocolHandler {
} else { } else {
let (ws_stream, _response) = client_async(request, tcp_stream) let (ws_stream, _response) = client_async(request, tcp_stream)
.await .await
.map_err(|e| format!("ws negotiate failed: {}", e))?; .map_err(map_to_string)
.map_err(logthru_net!(error))?;
let conn = NetworkConnection::Ws(WebsocketNetworkConnection::new(tls, ws_stream)); let conn = NetworkConnection::Ws(WebsocketNetworkConnection::new(tls, ws_stream));
network_manager network_manager
.on_new_connection( .on_new_connection(

View File

@ -12,14 +12,14 @@ impl Network {
async fn request_public_address(&self, node_ref: NodeRef) -> Option<SocketAddr> { async fn request_public_address(&self, node_ref: NodeRef) -> Option<SocketAddr> {
let routing_table = self.routing_table(); let routing_table = self.routing_table();
let rpc = routing_table.rpc_processor(); let rpc = routing_table.rpc_processor();
let info_answer = match rpc.rpc_call_info(node_ref.clone()).await { rpc.rpc_call_info(node_ref.clone())
Err(e) => { .await
trace!("failed to get info answer from {:?}: {:?}", node_ref, e); .map_err(logthru_net!(
return None; "failed to get info answer from {:?}",
} node_ref
Ok(ia) => ia, ))
}; .map(|info_answer| info_answer.sender_info.socket_address)
info_answer.sender_info.socket_address .unwrap_or(None)
} }
// find fast peers with a particular address type, and ask them to tell us what our external address is // find fast peers with a particular address type, and ask them to tell us what our external address is
@ -31,7 +31,7 @@ impl Network {
let routing_table = self.routing_table(); let routing_table = self.routing_table();
let peers = routing_table.get_fast_nodes_of_type(protocol_address_type); let peers = routing_table.get_fast_nodes_of_type(protocol_address_type);
if peers.is_empty() { if peers.is_empty() {
trace!("no peers of type '{:?}'", protocol_address_type); log_net!("no peers of type '{:?}'", protocol_address_type);
return None; return None;
} }
for peer in peers { for peer in peers {
@ -44,7 +44,7 @@ impl Network {
return Some((sa, peer)); return Some((sa, peer));
} }
} }
trace!("no peers responded with an external address"); log_net!("no peers responded with an external address");
None None
} }
@ -78,19 +78,13 @@ impl Network {
) -> bool { ) -> bool {
let routing_table = self.routing_table(); let routing_table = self.routing_table();
let rpc = routing_table.rpc_processor(); let rpc = routing_table.rpc_processor();
match rpc rpc.rpc_call_validate_dial_info(node_ref.clone(), dial_info, redirect, alternate_port)
.rpc_call_validate_dial_info(node_ref.clone(), dial_info, redirect, alternate_port)
.await .await
{ .map_err(logthru_net!(
Err(e) => { "failed to send validate_dial_info to {:?}",
error!( node_ref
"failed to send validate_dial_info to {:?}: {:?}", ))
node_ref, e .unwrap_or(false)
);
false
}
Ok(val) => val,
}
} }
async fn try_port_mapping<I: AsRef<[SocketAddr]>>( async fn try_port_mapping<I: AsRef<[SocketAddr]>>(
@ -103,7 +97,7 @@ impl Network {
} }
pub async fn update_udpv4_dialinfo_task_routine(self, _l: u64, _t: u64) -> Result<(), String> { pub async fn update_udpv4_dialinfo_task_routine(self, _l: u64, _t: u64) -> Result<(), String> {
trace!("looking for udpv4 public dial info"); log_net!("looking for udpv4 public dial info");
let routing_table = self.routing_table(); let routing_table = self.routing_table();
let mut retry_count = { let mut retry_count = {
@ -148,7 +142,7 @@ impl Network {
break; break;
} else { } else {
// UDP firewall? // UDP firewall?
warn!("UDP static public dial info not reachable. UDP firewall may be blocking inbound to {:?} for {:?}",external1_dial_info, node_b); log_net!("UDP static public dial info not reachable. UDP firewall may be blocking inbound to {:?} for {:?}",external1_dial_info, node_b);
} }
} else { } else {
// There is -some NAT- // There is -some NAT-
@ -260,7 +254,7 @@ impl Network {
} }
pub async fn update_tcpv4_dialinfo_task_routine(self, _l: u64, _t: u64) -> Result<(), String> { pub async fn update_tcpv4_dialinfo_task_routine(self, _l: u64, _t: u64) -> Result<(), String> {
trace!("looking for tcpv4 public dial info"); log_net!("looking for tcpv4 public dial info");
// xxx // xxx
//Err("unimplemented".to_owned()) //Err("unimplemented".to_owned())
Ok(()) Ok(())

View File

@ -45,8 +45,12 @@ impl Network {
data: Vec<u8>, data: Vec<u8>,
) -> Result<Option<Vec<u8>>, String> { ) -> Result<Option<Vec<u8>>, String> {
match descriptor.protocol_type() { match descriptor.protocol_type() {
ProtocolType::UDP => return Err("no support for udp protocol".to_owned()), ProtocolType::UDP => {
ProtocolType::TCP => return Err("no support for tcp protocol".to_owned()), return Err("no support for udp protocol".to_owned()).map_err(logthru_net!(error))
}
ProtocolType::TCP => {
return Err("no support for tcp protocol".to_owned()).map_err(logthru_net!(error))
}
ProtocolType::WS | ProtocolType::WSS => { ProtocolType::WS | ProtocolType::WSS => {
// find an existing connection in the connection table if one exists // find an existing connection in the connection table if one exists
let network_manager = self.inner.lock().network_manager.clone(); let network_manager = self.inner.lock().network_manager.clone();
@ -55,11 +59,7 @@ impl Network {
.get_connection(&descriptor) .get_connection(&descriptor)
{ {
// connection exists, send over it // connection exists, send over it
entry entry.conn.send(data).await.map_err(logthru_net!())?;
.conn
.send(data)
.await
.map_err(|_| "failed to send ws message".to_owned())?;
// Data was consumed // Data was consumed
return Ok(None); return Ok(None);
} }
@ -78,10 +78,16 @@ impl Network {
let network_manager = self.inner.lock().network_manager.clone(); let network_manager = self.inner.lock().network_manager.clone();
match &dial_info { match &dial_info {
DialInfo::UDP(_) => return Err("no support for UDP protocol".to_owned()), DialInfo::UDP(_) => {
DialInfo::TCP(_) => return Err("no support for TCP protocol".to_owned()), return Err("no support for UDP protocol".to_owned()).map_err(logthru_net!(error))
DialInfo::WS(_) => Err("WS protocol does not support unbound messages".to_owned()), }
DialInfo::WSS(_) => Err("WSS protocol does not support unbound messages".to_owned()), DialInfo::TCP(_) => {
return Err("no support for TCP protocol".to_owned()).map_err(logthru_net!(error))
}
DialInfo::WS(_) => Err("WS protocol does not support unbound messages".to_owned())
.map_err(logthru_net!(error)),
DialInfo::WSS(_) => Err("WSS protocol does not support unbound messages".to_owned())
.map_err(logthru_net!(error)),
} }
} }
pub async fn send_data_to_dial_info( pub async fn send_data_to_dial_info(
@ -92,19 +98,21 @@ impl Network {
let network_manager = self.inner.lock().network_manager.clone(); let network_manager = self.inner.lock().network_manager.clone();
let conn = match &dial_info { let conn = match &dial_info {
DialInfo::UDP(_) => return Err("no support for UDP protocol".to_owned()), DialInfo::UDP(_) => {
DialInfo::TCP(_) => return Err("no support for TCP protocol".to_owned()), return Err("no support for UDP protocol".to_owned()).map_err(logthru_net!(error))
}
DialInfo::TCP(_) => {
return Err("no support for TCP protocol".to_owned()).map_err(logthru_net!(error))
}
DialInfo::WS(_) => WebsocketProtocolHandler::connect(network_manager, dial_info) DialInfo::WS(_) => WebsocketProtocolHandler::connect(network_manager, dial_info)
.await .await
.map_err(|_| "failed to connect to WS dial info".to_owned())?, .map_err(logthru_net!(error))?,
DialInfo::WSS(_) => WebsocketProtocolHandler::connect(network_manager, dial_info) DialInfo::WSS(_) => WebsocketProtocolHandler::connect(network_manager, dial_info)
.await .await
.map_err(|_| "failed to connect to WSS dial info".to_owned())?, .map_err(logthru_net!(error))?,
}; };
conn.send(data) conn.send(data).await.map_err(logthru_net!(error))
.await
.map_err(|_| "failed to send data to dial info".to_owned())
} }
pub async fn send_data(&self, node_ref: NodeRef, data: Vec<u8>) -> Result<(), String> { pub async fn send_data(&self, node_ref: NodeRef, data: Vec<u8>) -> Result<(), String> {
@ -129,9 +137,13 @@ impl Network {
// If that fails, try to make a connection or reach out to the peer via its dial info // If that fails, try to make a connection or reach out to the peer via its dial info
if let Some(di) = dial_info { if let Some(di) = dial_info {
self.clone().send_data_to_dial_info(&di, di_data).await self.clone()
.send_data_to_dial_info(&di, di_data)
.await
.map_err(logthru_net!(error))
} else { } else {
Err("couldn't send data, no dial info or peer address".to_owned()) Err("couldn't send data, no dial info or peer address".to_owned())
.map_err(logthru_net!(error))
} }
} }

View File

@ -11,10 +11,10 @@ impl DummyNetworkConnection {
pub fn protocol_type(&self) -> ProtocolType { pub fn protocol_type(&self) -> ProtocolType {
ProtocolType::UDP ProtocolType::UDP
} }
pub fn send(&self, _message: Vec<u8>) -> SystemPinBoxFuture<Result<(), ()>> { pub fn send(&self, _message: Vec<u8>) -> SystemPinBoxFuture<Result<(), String>> {
Box::pin(async { Ok(()) }) Box::pin(async { Ok(()) })
} }
pub fn recv(&self) -> SystemPinBoxFuture<Result<Vec<u8>, ()>> { pub fn recv(&self) -> SystemPinBoxFuture<Result<Vec<u8>, String>> {
Box::pin(async { Ok(Vec::new()) }) Box::pin(async { Ok(Vec::new()) })
} }
} }

View File

@ -56,28 +56,31 @@ impl WebsocketNetworkConnection {
let inner = self.inner.clone(); let inner = self.inner.clone();
Box::pin(async move { Box::pin(async move {
if message.len() > MAX_MESSAGE_SIZE { if message.len() > MAX_MESSAGE_SIZE {
return Err("sending too large WS message".to_owned()); return Err("sending too large WS message".to_owned()).map_err(logthru_net!(error));
} }
inner.lock().ws.send_with_u8_array(&message).map_err(map_to_string) inner
.lock()
.ws
.send_with_u8_array(&message)
.map_err(|_| "failed to send to websocket".to_owned())
.map_err(logthru_net!(error))
}) })
} }
pub fn recv(&self) -> SystemPinBoxFuture<Result<Vec<u8>, String)>> { pub fn recv(&self) -> SystemPinBoxFuture<Result<Vec<u8>, String>> {
let inner = self.inner.clone(); let inner = self.inner.clone();
Box::pin(async move { Box::pin(async move {
let out = match inner.lock().ws_stream.next().await { let out = match inner.lock().ws_stream.next().await {
Some(WsMessage::Binary(v)) => v, Some(WsMessage::Binary(v)) => v,
Some(_) => { Some(_) => {
return Err("Unexpected WS message type".to_owned()); return Err("Unexpected WS message type".to_owned())
} .map_err(logthru_net!(error));
Some(Err(e)) => {
return Err(|e| e.to_string());
} }
None => { None => {
return Err("WS stream closed".to_owned()); return Err("WS stream closed".to_owned()).map_err(logthru_net!(error));
} }
}; };
if out.len() > MAX_MESSAGE_SIZE { if out.len() > MAX_MESSAGE_SIZE {
Err("sending too large WS message".to_owned()) Err("sending too large WS message".to_owned()).map_err(logthru_net!(error))
} else { } else {
Ok(out) Ok(out)
} }
@ -99,15 +102,22 @@ impl WebsocketProtocolHandler {
let (tls, host, port, protocol_type) = match dial_info { let (tls, host, port, protocol_type) = match dial_info {
DialInfo::WS(ws) => (false, ws.host.clone(), ws.port, ProtocolType::WS), DialInfo::WS(ws) => (false, ws.host.clone(), ws.port, ProtocolType::WS),
DialInfo::WSS(wss) => (true, wss.host.clone(), wss.port, ProtocolType::WSS), DialInfo::WSS(wss) => (true, wss.host.clone(), wss.port, ProtocolType::WSS),
_ => return Err("wrong protocol for WebsocketProtocolHandler".to_owned()), _ => {
return Err("wrong protocol for WebsocketProtocolHandler".to_owned())
.map_err(logthru_net!(error))
}
}; };
let peer_addr = PeerAddress::new(Address::from_str(&host)?, port, protocol_type); let peer_addr = PeerAddress::new(
Address::from_str(&host).map_err(logthru_net!(error))?,
port,
protocol_type,
);
let (ws, wsio) = match WsMeta::connect(url, None).await { let (ws, wsio) = WsMeta::connect(url, None)
Ok(conn) => conn, .await
Err(e) => return Err(format!("couldn't connect to WS url: {}", e)), .map_err(map_to_string)
}; .map_err(logthru_net!(error))?;
let conn = NetworkConnection::WS(WebsocketNetworkConnection::new(tls, ws, wsio)); let conn = NetworkConnection::WS(WebsocketNetworkConnection::new(tls, ws, wsio));
network_manager network_manager

View File

@ -29,11 +29,11 @@ pub fn get_timestamp() -> u64 {
} }
pub fn get_timestamp_string() -> String { pub fn get_timestamp_string() -> String {
let date = Date::now(); let date = Date::new_0();
let hours = Date::get_utc_hours(date); let hours = Date::get_utc_hours(&date);
let minutes = Date::get_utc_minutes(date); let minutes = Date::get_utc_minutes(&date);
let seconds = Date::get_utc_seconds(date); let seconds = Date::get_utc_seconds(&date);
let milliseconds = Date::get_utc_milliseconds(date); let milliseconds = Date::get_utc_milliseconds(&date);
format!( format!(
"{:02}:{:02}:{:02}.{}", "{:02}:{:02}:{:02}.{}",
hours, minutes, seconds, milliseconds hours, minutes, seconds, milliseconds

View File

@ -1,5 +1,6 @@
use crate::xx::*; use crate::xx::*;
use alloc::collections::VecDeque; use alloc::collections::VecDeque;
use core::fmt;
#[derive(Debug)] #[derive(Debug)]
pub struct Channel<T> { pub struct Channel<T> {
@ -58,6 +59,19 @@ pub enum TrySendError<T> {
Disconnected(T), Disconnected(T),
} }
impl<T> fmt::Display for TrySendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
TrySendError::Full(_) => {
write!(f, "Full")
}
TrySendError::Disconnected(_) => {
write!(f, "Disconnected")
}
}
}
}
impl<T> Sender<T> { impl<T> Sender<T> {
// NOTE: This needs a timeout or you could block a very long time // NOTE: This needs a timeout or you could block a very long time
// pub async fn send(&self, msg: T) -> Result<(), SendError<T>> { // pub async fn send(&self, msg: T) -> Result<(), SendError<T>> {

View File

@ -28,7 +28,7 @@ impl Bucket {
} }
pub(super) fn add_entry(&mut self, node_id: DHTKey) -> NodeRef { pub(super) fn add_entry(&mut self, node_id: DHTKey) -> NodeRef {
info!("Node added: {}", node_id.encode()); log_rtab!("Node added: {}", node_id.encode());
// Add new entry // Add new entry
self.entries.insert(node_id, BucketEntry::new()); self.entries.insert(node_id, BucketEntry::new());
@ -42,7 +42,7 @@ impl Bucket {
} }
pub(super) fn remove_entry(&mut self, node_id: &DHTKey) { pub(super) fn remove_entry(&mut self, node_id: &DHTKey) {
info!("Node removed: {}", node_id); log_rtab!("Node removed: {}", node_id);
// Remove the entry // Remove the entry
self.entries.remove(node_id); self.entries.remove(node_id);

View File

@ -215,7 +215,7 @@ impl RoutingTable {
// transform, // transform,
transform, transform,
); );
trace!(">> find_fastest_nodes: node count = {}", out.len()); log_rtab!(">> find_fastest_nodes: node count = {}", out.len());
out out
} }
@ -272,7 +272,7 @@ impl RoutingTable {
// transform, // transform,
transform, transform,
); );
trace!(">> find_closest_nodes: node count = {}", out.len()); log_rtab!(">> find_closest_nodes: node count = {}", out.len());
out out
} }
} }

View File

@ -393,7 +393,7 @@ impl RoutingTable {
if let Some(dead_node_ids) = bucket.kick(bucket_depth) { if let Some(dead_node_ids) = bucket.kick(bucket_depth) {
// Remove counts // Remove counts
inner.bucket_entry_count -= dead_node_ids.len(); inner.bucket_entry_count -= dead_node_ids.len();
debug!("Routing table now has {} nodes", inner.bucket_entry_count); log_rtab!("Routing table now has {} nodes", inner.bucket_entry_count);
// Now purge the routing table inner vectors // Now purge the routing table inner vectors
//let filter = |k: &DHTKey| dead_node_ids.contains(k); //let filter = |k: &DHTKey| dead_node_ids.contains(k);
@ -431,7 +431,7 @@ impl RoutingTable {
pub fn create_node_ref(&self, node_id: DHTKey) -> Result<NodeRef, String> { pub fn create_node_ref(&self, node_id: DHTKey) -> Result<NodeRef, String> {
// Ensure someone isn't trying register this node itself // Ensure someone isn't trying register this node itself
if node_id == self.node_id() { if node_id == self.node_id() {
return Err("can't register own node".to_owned()); return Err("can't register own node".to_owned()).map_err(logthru_rtab!(error));
} }
// Insert into bucket, possibly evicting the newest bucket member // Insert into bucket, possibly evicting the newest bucket member
@ -448,7 +448,7 @@ impl RoutingTable {
// Update count // Update count
inner.bucket_entry_count += 1; inner.bucket_entry_count += 1;
debug!("Routing table now has {} nodes", inner.bucket_entry_count); log_rtab!("Routing table now has {} nodes", inner.bucket_entry_count);
nr nr
}; };
@ -480,13 +480,7 @@ impl RoutingTable {
node_id: DHTKey, node_id: DHTKey,
dial_infos: &[DialInfo], dial_infos: &[DialInfo],
) -> Result<NodeRef, String> { ) -> Result<NodeRef, String> {
let nr = match self.create_node_ref(node_id) { let nr = self.create_node_ref(node_id)?;
Err(e) => {
return Err(format!("Couldn't create node reference: {}", e));
}
Ok(v) => v,
};
nr.operate(move |e| -> Result<(), String> { nr.operate(move |e| -> Result<(), String> {
for di in dial_infos { for di in dial_infos {
e.add_dial_info(di.clone())?; e.add_dial_info(di.clone())?;
@ -505,13 +499,7 @@ impl RoutingTable {
descriptor: ConnectionDescriptor, descriptor: ConnectionDescriptor,
timestamp: u64, timestamp: u64,
) -> Result<NodeRef, String> { ) -> Result<NodeRef, String> {
let nr = match self.create_node_ref(node_id) { let nr = self.create_node_ref(node_id)?;
Err(e) => {
return Err(format!("Couldn't create node reference: {}", e));
}
Ok(v) => v,
};
nr.operate(move |e| { nr.operate(move |e| {
// set the most recent node address for connection finding and udp replies // set the most recent node address for connection finding and udp replies
e.set_last_connection(descriptor, timestamp); e.set_last_connection(descriptor, timestamp);
@ -535,7 +523,7 @@ impl RoutingTable {
let node_id = self.node_id(); let node_id = self.node_id();
let rpc_processor = self.rpc_processor(); let rpc_processor = self.rpc_processor();
let res = match rpc_processor let res = rpc_processor
.rpc_call_find_node( .rpc_call_find_node(
Destination::Direct(node_ref.clone()), Destination::Direct(node_ref.clone()),
node_id, node_id,
@ -543,13 +531,9 @@ impl RoutingTable {
RespondTo::Sender, RespondTo::Sender,
) )
.await .await
{ .map_err(map_to_string)
Ok(v) => v, .map_err(logthru_rtab!())?;
Err(e) => { log_rtab!(
return Err(format!("couldn't contact node at {:?}: {}", &node_ref, e));
}
};
trace!(
"find_self for at {:?} answered in {}ms", "find_self for at {:?} answered in {}ms",
&node_ref, &node_ref,
timestamp_to_secs(res.latency) * 1000.0f64 timestamp_to_secs(res.latency) * 1000.0f64
@ -564,15 +548,14 @@ impl RoutingTable {
} }
// register the node if it's new // register the node if it's new
let nr = match self.register_node_with_dial_info(p.node_id.key, &p.dial_infos) { let nr = self
Ok(v) => v, .register_node_with_dial_info(p.node_id.key, &p.dial_infos)
Err(e) => { .map_err(map_to_string)
return Err(format!( .map_err(logthru_rtab!(
"couldn't register node {} at {:?}: {}", "couldn't register node {} at {:?}",
p.node_id.key, &p.dial_infos, e p.node_id.key,
)); &p.dial_infos
} ))?;
};
out.push(nr); out.push(nr);
} }
Ok(out) Ok(out)
@ -585,7 +568,7 @@ impl RoutingTable {
// Ask bootstrap server for nodes closest to our own node // Ask bootstrap server for nodes closest to our own node
let closest_nodes = match self.find_self(node_ref.clone()).await { let closest_nodes = match self.find_self(node_ref.clone()).await {
Err(e) => { Err(e) => {
error!( log_rtab!(error
"reverse_find_node: find_self failed for {:?}: {}", "reverse_find_node: find_self failed for {:?}: {}",
&node_ref, e &node_ref, e
); );
@ -599,7 +582,7 @@ impl RoutingTable {
for closest_nr in closest_nodes { for closest_nr in closest_nodes {
match self.find_self(closest_nr.clone()).await { match self.find_self(closest_nr.clone()).await {
Err(e) => { Err(e) => {
error!( log_rtab!(error
"reverse_find_node: closest node find_self failed for {:?}: {}", "reverse_find_node: closest node find_self failed for {:?}: {}",
&closest_nr, e &closest_nr, e
); );
@ -617,36 +600,28 @@ impl RoutingTable {
c.network.bootstrap.clone() c.network.bootstrap.clone()
}; };
debug!("--- bootstrap_task"); log_rtab!("--- bootstrap_task");
// Map all bootstrap entries to a single key with multiple dialinfo // Map all bootstrap entries to a single key with multiple dialinfo
let mut bsmap: BTreeMap<DHTKey, Vec<DialInfo>> = BTreeMap::new(); let mut bsmap: BTreeMap<DHTKey, Vec<DialInfo>> = BTreeMap::new();
for b in bootstrap { for b in bootstrap {
let ndis = match NodeDialInfoSingle::from_str(b.as_str()) { let ndis = NodeDialInfoSingle::from_str(b.as_str())
Err(_) => { .map_err(logthru_rtab!("Invalid dial info in bootstrap entry: {}", b))?;
return Err(format!("Invalid dial info in bootstrap entry: {}", b));
}
Ok(v) => v,
};
let node_id = ndis.node_id.key; let node_id = ndis.node_id.key;
bsmap bsmap
.entry(node_id) .entry(node_id)
.or_insert_with(Vec::new) .or_insert_with(Vec::new)
.push(ndis.dial_info); .push(ndis.dial_info);
} }
trace!(" bootstrap list: {:?}", bsmap); log_rtab!(" bootstrap list: {:?}", bsmap);
// Run all bootstrap operations concurrently // Run all bootstrap operations concurrently
let mut unord = FuturesUnordered::new(); let mut unord = FuturesUnordered::new();
for (k, v) in bsmap { for (k, v) in bsmap {
let nr = match self.register_node_with_dial_info(k, &v) { let nr = self
Ok(nr) => nr, .register_node_with_dial_info(k, &v)
Err(e) => { .map_err(logthru_rtab!("Couldn't add bootstrap node: {}", k))?;
return Err(format!("Couldn't add bootstrap node: {}", e)); log_rtab!(" bootstrapping {} with {:?}", k.encode(), &v);
}
};
trace!(" bootstrapping {} with {:?}", k.encode(), &v);
unord.push(self.reverse_find_node(nr, true)); unord.push(self.reverse_find_node(nr, true));
} }
while unord.next().await.is_some() {} while unord.next().await.is_some() {}
@ -659,7 +634,7 @@ impl RoutingTable {
// Ask our remaining peers to give us more peers before we go // Ask our remaining peers to give us more peers before we go
// back to the bootstrap servers to keep us from bothering them too much // back to the bootstrap servers to keep us from bothering them too much
async fn peer_minimum_refresh_task_routine(self) -> Result<(), String> { async fn peer_minimum_refresh_task_routine(self) -> Result<(), String> {
trace!("--- peer_minimum_refresh task"); log_rtab!("--- peer_minimum_refresh task");
// get list of all peers we know about, even the unreliable ones, and ask them to bootstrap too // get list of all peers we know about, even the unreliable ones, and ask them to bootstrap too
let noderefs = { let noderefs = {
@ -672,7 +647,7 @@ impl RoutingTable {
} }
noderefs noderefs
}; };
trace!(" refreshing with nodes: {:?}", noderefs); log_rtab!(" refreshing with nodes: {:?}", noderefs);
// do peer minimum search concurrently // do peer minimum search concurrently
let mut unord = FuturesUnordered::new(); let mut unord = FuturesUnordered::new();
@ -688,14 +663,14 @@ impl RoutingTable {
// Ping each node in the routing table if they need to be pinged // Ping each node in the routing table if they need to be pinged
// to determine their reliability // to determine their reliability
async fn ping_validator_task_routine(self, _last_ts: u64, cur_ts: u64) -> Result<(), String> { async fn ping_validator_task_routine(self, _last_ts: u64, cur_ts: u64) -> Result<(), String> {
trace!("--- ping_validator task"); log_rtab!("--- ping_validator task");
let rpc = self.rpc_processor(); let rpc = self.rpc_processor();
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
for b in &mut inner.buckets { for b in &mut inner.buckets {
for (k, entry) in b.entries_mut() { for (k, entry) in b.entries_mut() {
if entry.needs_ping(cur_ts) { if entry.needs_ping(cur_ts) {
let nr = NodeRef::new(self.clone(), *k, entry); let nr = NodeRef::new(self.clone(), *k, entry);
debug!( log_rtab!(
" --- ping validating: {:?} ({})", " --- ping validating: {:?} ({})",
nr, nr,
entry.state_debug_info(cur_ts) entry.state_debug_info(cur_ts)
@ -709,7 +684,7 @@ impl RoutingTable {
// Compute transfer statistics to determine how 'fast' a node is // Compute transfer statistics to determine how 'fast' a node is
async fn rolling_transfers_task_routine(self, last_ts: u64, cur_ts: u64) -> Result<(), String> { async fn rolling_transfers_task_routine(self, last_ts: u64, cur_ts: u64) -> Result<(), String> {
trace!("--- rolling_transfers task"); log_rtab!("--- rolling_transfers task");
let inner = &mut *self.inner.lock(); let inner = &mut *self.inner.lock();
// Roll our own node's transfers // Roll our own node's transfers

View File

@ -98,14 +98,18 @@ where
let wordvec = builder let wordvec = builder
.into_reader() .into_reader()
.canonicalize() .canonicalize()
.map_err(map_error_capnp_error!())?; .map_err(map_error_capnp_error!())
.map_err(logthru_rpc!())?;
Ok(capnp::Word::words_to_bytes(wordvec.as_slice()).to_vec()) Ok(capnp::Word::words_to_bytes(wordvec.as_slice()).to_vec())
} }
fn reader_to_vec<'a, T>(reader: capnp::message::Reader<T>) -> Result<Vec<u8>, RPCError> fn reader_to_vec<'a, T>(reader: capnp::message::Reader<T>) -> Result<Vec<u8>, RPCError>
where where
T: capnp::message::ReaderSegments + 'a, T: capnp::message::ReaderSegments + 'a,
{ {
let wordvec = reader.canonicalize().map_err(map_error_capnp_error!())?; let wordvec = reader
.canonicalize()
.map_err(map_error_capnp_error!())
.map_err(logthru_rpc!())?;
Ok(capnp::Word::words_to_bytes(wordvec.as_slice()).to_vec()) Ok(capnp::Word::words_to_bytes(wordvec.as_slice()).to_vec())
} }
@ -222,7 +226,7 @@ impl RPCProcessor {
// xxx find node but stop if we find the exact node we want // xxx find node but stop if we find the exact node we want
// xxx return whatever node is closest after the timeout // xxx return whatever node is closest after the timeout
Err(rpc_error_unimplemented("search_dht_single_key")) Err(rpc_error_unimplemented("search_dht_single_key")).map_err(logthru_rpc!(error))
} }
// Search the DHT for the 'count' closest nodes to a key, adding them all to the routing table if they are not there and returning their node references // Search the DHT for the 'count' closest nodes to a key, adding them all to the routing table if they are not there and returning their node references
@ -234,7 +238,7 @@ impl RPCProcessor {
_timeout: Option<u64>, _timeout: Option<u64>,
) -> Result<Vec<NodeRef>, RPCError> { ) -> Result<Vec<NodeRef>, RPCError> {
// xxx return closest nodes after the timeout // xxx return closest nodes after the timeout
Err(rpc_error_unimplemented("search_dht_multi_key")) Err(rpc_error_unimplemented("search_dht_multi_key")).map_err(logthru_rpc!(error))
} }
// Search the DHT for a specific node corresponding to a key unless we have that node in our routing table already, and return the node reference // Search the DHT for a specific node corresponding to a key unless we have that node in our routing table already, and return the node reference
@ -254,8 +258,7 @@ impl RPCProcessor {
if nr.node_id() != node_id { if nr.node_id() != node_id {
// found a close node, but not exact within our configured resolve_node timeout // found a close node, but not exact within our configured resolve_node timeout
error!("XXX: Timeout"); return Err(RPCError::Timeout).map_err(logthru_rpc!());
return Err(RPCError::Timeout);
} }
Ok(nr) Ok(nr)
@ -559,7 +562,8 @@ impl RPCProcessor {
let request_operation = request_rpcreader let request_operation = request_rpcreader
.reader .reader
.get_root::<veilid_capnp::operation::Reader>() .get_root::<veilid_capnp::operation::Reader>()
.map_err(map_error_capnp_error!())?; .map_err(map_error_capnp_error!())
.map_err(logthru_rpc!())?;
let reply_vec = reader_to_vec(reply_msg)?; let reply_vec = reader_to_vec(reply_msg)?;
@ -567,12 +571,14 @@ impl RPCProcessor {
match request_operation match request_operation
.get_respond_to() .get_respond_to()
.which() .which()
.map_err(map_error_internal!("invalid request operation"))? .map_err(map_error_internal!("invalid request operation"))
.map_err(logthru_rpc!())?
{ {
veilid_capnp::operation::respond_to::None(_) => { veilid_capnp::operation::respond_to::None(_) => {
// Do not respond // Do not respond
// -------------- // --------------
return Err(rpc_error_internal("no response requested")); return Err(rpc_error_internal("no response requested"))
.map_err(logthru_rpc!());
} }
veilid_capnp::operation::respond_to::Sender(_) => { veilid_capnp::operation::respond_to::Sender(_) => {
// Respond to envelope source node, possibly through a relay if the request arrived that way // Respond to envelope source node, possibly through a relay if the request arrived that way
@ -594,17 +600,20 @@ impl RPCProcessor {
// No private route was specified for the return // No private route was specified for the return
// but we are using a safety route, so we must create an empty private route // but we are using a safety route, so we must create an empty private route
let mut pr_builder = ::capnp::message::Builder::new_default(); let mut pr_builder = ::capnp::message::Builder::new_default();
let private_route = self.new_stub_private_route( let private_route = self
.new_stub_private_route(
request_rpcreader.header.envelope.get_sender_id(), request_rpcreader.header.envelope.get_sender_id(),
&mut pr_builder, &mut pr_builder,
)?; )
.map_err(logthru_rpc!())?;
out = self.wrap_with_route(Some(sr), private_route, reply_vec)?; out = self.wrap_with_route(Some(sr), private_route, reply_vec)?;
// first // first
out_node_id = sr out_node_id = sr
.hops .hops
.first() .first()
.ok_or_else(|| rpc_error_internal("no hop in safety route"))? .ok_or_else(|| rpc_error_internal("no hop in safety route"))
.map_err(logthru_rpc!())?
.dial_info .dial_info
.node_id .node_id
.key; .key;
@ -618,7 +627,10 @@ impl RPCProcessor {
// Extract private route for reply // Extract private route for reply
let private_route = match pr { let private_route = match pr {
Ok(v) => v, Ok(v) => v,
Err(_) => return Err(rpc_error_internal("invalid private route")), Err(_) => {
return Err(rpc_error_internal("invalid private route"))
.map_err(logthru_rpc!())
}
}; };
// Reply with 'route' operation // Reply with 'route' operation
@ -627,23 +639,27 @@ impl RPCProcessor {
None => { None => {
// If no safety route, the first node is the first hop of the private route // If no safety route, the first node is the first hop of the private route
if !private_route.has_first_hop() { if !private_route.has_first_hop() {
return Err(rpc_error_internal("private route has no hops")); return Err(rpc_error_internal("private route has no hops"))
.map_err(logthru_rpc!());
} }
let hop = private_route let hop = private_route
.get_first_hop() .get_first_hop()
.map_err(map_error_internal!("not a valid first hop"))?; .map_err(map_error_internal!("not a valid first hop"))?;
decode_public_key( decode_public_key(
&hop.get_dial_info() &hop.get_dial_info()
.map_err(map_error_internal!("not a valid dial info"))? .map_err(map_error_internal!("not a valid dial info"))
.map_err(logthru_rpc!())?
.get_node_id() .get_node_id()
.map_err(map_error_internal!("not a valid node id"))?, .map_err(map_error_internal!("not a valid node id"))
.map_err(logthru_rpc!())?,
) )
} }
Some(sr) => { Some(sr) => {
// If safety route is in use, first node is the first hop of the safety route // If safety route is in use, first node is the first hop of the safety route
sr.hops sr.hops
.first() .first()
.ok_or_else(|| rpc_error_internal("no hop in safety route"))? .ok_or_else(|| rpc_error_internal("no hop in safety route"))
.map_err(logthru_rpc!())?
.dial_info .dial_info
.node_id .node_id
.key .key
@ -774,7 +790,8 @@ impl RPCProcessor {
let operation = rpcreader let operation = rpcreader
.reader .reader
.get_root::<veilid_capnp::operation::Reader>() .get_root::<veilid_capnp::operation::Reader>()
.map_err(map_error_capnp_error!())?; .map_err(map_error_capnp_error!())
.map_err(logthru_rpc!())?;
// Don't bother unless we are going to answer // Don't bother unless we are going to answer
if !self.wants_answer(&operation)? { if !self.wants_answer(&operation)? {
@ -813,7 +830,8 @@ impl RPCProcessor {
let operation = rpcreader let operation = rpcreader
.reader .reader
.get_root::<veilid_capnp::operation::Reader>() .get_root::<veilid_capnp::operation::Reader>()
.map_err(map_error_capnp_error!())?; .map_err(map_error_capnp_error!())
.map_err(logthru_rpc!())?;
// This should never want an answer // This should never want an answer
if self.wants_answer(&operation)? { if self.wants_answer(&operation)? {
@ -902,11 +920,12 @@ impl RPCProcessor {
let operation = rpcreader let operation = rpcreader
.reader .reader
.get_root::<veilid_capnp::operation::Reader>() .get_root::<veilid_capnp::operation::Reader>()
.map_err(map_error_capnp_error!())?; .map_err(map_error_capnp_error!())
.map_err(logthru_rpc!())?;
// find_node must always want an answer // find_node must always want an answer
if !self.wants_answer(&operation)? { if !self.wants_answer(&operation)? {
return Err(RPCError::InvalidFormat); return Err(RPCError::InvalidFormat).map_err(logthru_rpc!());
} }
// get findNodeQ reader // get findNodeQ reader
@ -916,12 +935,17 @@ impl RPCProcessor {
}; };
// ensure find_node peerinfo matches the envelope // ensure find_node peerinfo matches the envelope
let target_node_id = let target_node_id = decode_public_key(
decode_public_key(&fnq_reader.get_node_id().map_err(map_error_capnp_error!())?); &fnq_reader
.get_node_id()
.map_err(map_error_capnp_error!())
.map_err(logthru_rpc!())?,
);
let peer_info = decode_peer_info( let peer_info = decode_peer_info(
&fnq_reader &fnq_reader
.get_peer_info() .get_peer_info()
.map_err(map_error_capnp_error!())?, .map_err(map_error_capnp_error!())
.map_err(logthru_rpc!())?,
)?; )?;
if peer_info.node_id.key != rpcreader.header.envelope.get_sender_id() { if peer_info.node_id.key != rpcreader.header.envelope.get_sender_id() {
return Err(RPCError::InvalidFormat); return Err(RPCError::InvalidFormat);
@ -958,7 +982,7 @@ impl RPCProcessor {
// transform // transform
|e| RoutingTable::transform_to_peer_info(e, peer_scope, &own_peer_info), |e| RoutingTable::transform_to_peer_info(e, peer_scope, &own_peer_info),
); );
trace!(">>>> Returning {} closest peers", closest_nodes.len()); log_rpc!(">>>> Returning {} closest peers", closest_nodes.len());
// Send find_node answer // Send find_node answer
let mut reply_msg = ::capnp::message::Builder::new_default(); let mut reply_msg = ::capnp::message::Builder::new_default();
@ -1022,7 +1046,8 @@ impl RPCProcessor {
let operation = rpcreader let operation = rpcreader
.reader .reader
.get_root::<veilid_capnp::operation::Reader>() .get_root::<veilid_capnp::operation::Reader>()
.map_err(map_error_capnp_error!())?; .map_err(map_error_capnp_error!())
.map_err(logthru_rpc!())?;
// This should never want an answer // This should never want an answer
if self.wants_answer(&operation)? { if self.wants_answer(&operation)? {
@ -1072,7 +1097,8 @@ impl RPCProcessor {
let operation = rpcreader let operation = rpcreader
.reader .reader
.get_root::<veilid_capnp::operation::Reader>() .get_root::<veilid_capnp::operation::Reader>()
.map_err(map_error_capnp_error!())?; .map_err(map_error_capnp_error!())
.map_err(logthru_rpc!())?;
operation.get_op_id() operation.get_op_id()
}; };
@ -1091,7 +1117,8 @@ impl RPCProcessor {
let operation = rpcreader let operation = rpcreader
.reader .reader
.get_root::<veilid_capnp::operation::Reader>() .get_root::<veilid_capnp::operation::Reader>()
.map_err(map_error_capnp_error!())?; .map_err(map_error_capnp_error!())
.map_err(logthru_rpc!())?;
match operation match operation
.get_detail() .get_detail()
@ -1300,11 +1327,13 @@ impl RPCProcessor {
let response_operation = rpcreader let response_operation = rpcreader
.reader .reader
.get_root::<veilid_capnp::operation::Reader>() .get_root::<veilid_capnp::operation::Reader>()
.map_err(map_error_capnp_error!())?; .map_err(map_error_capnp_error!())
.map_err(logthru_rpc!())?;
let info_a = match response_operation let info_a = match response_operation
.get_detail() .get_detail()
.which() .which()
.map_err(map_error_capnp_notinschema!())? .map_err(map_error_capnp_notinschema!())
.map_err(logthru_rpc!())?
{ {
veilid_capnp::operation::detail::InfoA(a) => { veilid_capnp::operation::detail::InfoA(a) => {
a.map_err(map_error_internal!("Invalid InfoA"))? a.map_err(map_error_internal!("Invalid InfoA"))?
@ -1450,11 +1479,13 @@ impl RPCProcessor {
let response_operation = rpcreader let response_operation = rpcreader
.reader .reader
.get_root::<veilid_capnp::operation::Reader>() .get_root::<veilid_capnp::operation::Reader>()
.map_err(map_error_capnp_error!())?; .map_err(map_error_capnp_error!())
.map_err(logthru_rpc!())?;
let find_node_a = match response_operation let find_node_a = match response_operation
.get_detail() .get_detail()
.which() .which()
.map_err(map_error_capnp_notinschema!())? .map_err(map_error_capnp_notinschema!())
.map_err(logthru_rpc!())?
{ {
veilid_capnp::operation::detail::FindNodeA(a) => { veilid_capnp::operation::detail::FindNodeA(a) => {
a.map_err(map_error_internal!("Invalid FindNodeA"))? a.map_err(map_error_internal!("Invalid FindNodeA"))?

View File

@ -586,7 +586,7 @@ impl core::str::FromStr for NodeDialInfoSingle {
cur_url = &cur_url[6..]; cur_url = &cur_url[6..];
proto = ProtocolType::WSS; proto = ProtocolType::WSS;
} else { } else {
return Err("unknown protocol".to_owned()); return Err(format!("unknown protocol: {}", url));
} }
// parse out node id if we have one // parse out node id if we have one

View File

@ -1,3 +1,5 @@
pub use alloc::string::{String, ToString};
pub fn map_to_string<X: ToString>(arg: X) -> String { pub fn map_to_string<X: ToString>(arg: X) -> String {
arg.to_string() arg.to_string()
} }
@ -31,16 +33,16 @@ macro_rules! log_net {
"{}", "{}",
$text, $text,
)}; )};
(error $fmt:literal, $($arg:expr)+) => { (error $fmt:literal, $($arg:expr),+) => {
error!(target:"net", $fmt, $($arg)+); error!(target:"net", $fmt, $($arg),+);
}; };
($text:expr) => {trace!( ($text:expr) => {trace!(
target: "net", target: "net",
"{}", "{}",
$text, $text,
)}; )};
($fmt:literal, $($arg:expr)+) => { ($fmt:literal, $($arg:expr),+) => {
trace!(target:"net", $fmt, $($arg)+); trace!(target:"net", $fmt, $($arg),+);
} }
} }
@ -51,16 +53,16 @@ macro_rules! log_rpc {
"{}", "{}",
$text, $text,
)}; )};
(error $fmt:literal, $($arg:expr)+) => { (error $fmt:literal, $($arg:expr),+) => {
error!(target:"rpc", $fmt, $($arg)+); error!(target:"rpc", $fmt, $($arg),+);
}; };
($text:expr) => {trace!( ($text:expr) => {trace!(
target: "rpc", target: "rpc",
"{}", "{}",
$text, $text,
)}; )};
($fmt:literal, $($arg:expr)+) => { ($fmt:literal, $($arg:expr),+) => {
trace!(target:"rpc", $fmt, $($arg)+); trace!(target:"rpc", $fmt, $($arg),+);
} }
} }
@ -71,16 +73,16 @@ macro_rules! log_rtab {
"{}", "{}",
$text, $text,
)}; )};
(error $fmt:literal, $($arg:expr)+) => { (error $fmt:literal, $($arg:expr),+) => {
error!(target:"rtab", $fmt, $($arg)+); error!(target:"rtab", $fmt, $($arg),+);
}; };
($text:expr) => {trace!( ($text:expr) => {trace!(
target: "rtab", target: "rtab",
"{}", "{}",
$text, $text,
)}; )};
($fmt:literal, $($arg:expr)+) => { ($fmt:literal, $($arg:expr),+) => {
trace!(target:"rtab", $fmt, $($arg)+); trace!(target:"rtab", $fmt, $($arg),+);
} }
} }
@ -92,8 +94,8 @@ macro_rules! logthru_net {
($($level:ident)? $text:literal) => { ($($level:ident)? $text:literal) => {
logthru!($($level)? "net", $text) logthru!($($level)? "net", $text)
}; };
($($level:ident)? $fmt:literal, $($arg:expr)+) => { ($($level:ident)? $fmt:literal, $($arg:expr),+) => {
logthru!($($level)? "net", $fmt, $($arg)+) logthru!($($level)? "net", $fmt, $($arg),+)
} }
} }
#[macro_export] #[macro_export]
@ -104,8 +106,8 @@ macro_rules! logthru_rpc {
($($level:ident)? $text:literal) => { ($($level:ident)? $text:literal) => {
logthru!($($level)? "rpc", $text) logthru!($($level)? "rpc", $text)
}; };
($($level:ident)? $fmt:literal, $($arg:expr)+) => { ($($level:ident)? $fmt:literal, $($arg:expr),+) => {
logthru!($($level)? "rpc", $fmt, $($arg)+) logthru!($($level)? "rpc", $fmt, $($arg),+)
} }
} }
@ -117,8 +119,8 @@ macro_rules! logthru_rtab {
($($level:ident)? $text:literal) => { ($($level:ident)? $text:literal) => {
logthru!($($level)? "rtab", $text) logthru!($($level)? "rtab", $text)
}; };
($($level:ident)? $fmt:literal, $($arg:expr)+) => { ($($level:ident)? $fmt:literal, $($arg:expr),+) => {
logthru!($($level)? "rtab", $fmt, $($arg)+) logthru!($($level)? "rtab", $fmt, $($arg),+)
} }
} }
@ -142,12 +144,12 @@ macro_rules! logthru {
); );
e__ e__
}); });
(error $target:literal, $fmt:literal, $($arg:expr)+) => (|e__| { (error $target:literal, $fmt:literal, $($arg:expr),+) => (|e__| {
error!( error!(
target: $target, target: $target,
concat!("[{}] ", $fmt), concat!("[{}] ", $fmt),
e__, e__,
$($arg)+ $($arg),+
); );
e__ e__
}); });
@ -169,12 +171,12 @@ macro_rules! logthru {
); );
e__ e__
}); });
($target:literal, $fmt:literal, $($arg:expr)+) => (|e__| { ($target:literal, $fmt:literal, $($arg:expr),+) => (|e__| {
trace!( trace!(
target: $target, target: $target,
concat!("[{}] ", $fmt), concat!("[{}] ", $fmt),
e__, e__,
$($arg)+ $($arg),+
); );
e__ e__
}) })

View File

@ -8,6 +8,9 @@
// Only IP address and DNS hostname host fields are supported // Only IP address and DNS hostname host fields are supported
use super::IpAddr; use super::IpAddr;
use alloc::borrow::ToOwned;
use alloc::string::String;
use alloc::vec::Vec;
use core::fmt; use core::fmt;
use core::str::FromStr; use core::str::FromStr;