logging refactor

This commit is contained in:
John Smith 2021-12-16 21:57:28 -05:00
parent 3b54c2f8bd
commit 46f8607998
16 changed files with 847 additions and 372 deletions

1
Cargo.lock generated
View File

@ -3651,6 +3651,7 @@ dependencies = [
"capnpc", "capnpc",
"cfg-if 0.1.10", "cfg-if 0.1.10",
"chacha20poly1305", "chacha20poly1305",
"chrono",
"config 0.11.0", "config 0.11.0",
"console_error_panic_hook", "console_error_panic_hook",
"curve25519-dalek-ng", "curve25519-dalek-ng",

View File

@ -66,6 +66,7 @@ if-addrs = { path = "../external/if-addrs" }
async_executors = { version = "^0", features = [ "async_std" ]} async_executors = { version = "^0", features = [ "async_std" ]}
socket2 = "^0" socket2 = "^0"
bugsalot = "^0" bugsalot = "^0"
chrono = "^0"
# Dependencies for WASM builds only # Dependencies for WASM builds only
[target.'cfg(target_arch = "wasm32")'.dependencies] [target.'cfg(target_arch = "wasm32")'.dependencies]

View File

@ -14,10 +14,10 @@ impl DummyNetworkConnection {
pub fn protocol_type(&self) -> ProtocolType { pub fn protocol_type(&self) -> ProtocolType {
ProtocolType::UDP ProtocolType::UDP
} }
pub fn send(&self, _message: Vec<u8>) -> SystemPinBoxFuture<Result<(), ()>> { pub fn send(&self, _message: Vec<u8>) -> SystemPinBoxFuture<Result<(), String>> {
Box::pin(async { Ok(()) }) Box::pin(async { Ok(()) })
} }
pub fn recv(&self) -> SystemPinBoxFuture<Result<Vec<u8>, ()>> { pub fn recv(&self) -> SystemPinBoxFuture<Result<Vec<u8>, String>> {
Box::pin(async { Ok(Vec::new()) }) Box::pin(async { Ok(Vec::new()) })
} }
} }
@ -42,7 +42,7 @@ impl NetworkConnection {
Self::Wss(w) => w.protocol_type(), Self::Wss(w) => w.protocol_type(),
} }
} }
pub fn send(&self, message: Vec<u8>) -> SystemPinBoxFuture<Result<(), ()>> { pub fn send(&self, message: Vec<u8>) -> SystemPinBoxFuture<Result<(), String>> {
match self { match self {
Self::Dummy(d) => d.send(message), Self::Dummy(d) => d.send(message),
Self::RawTcp(t) => t.send(message), Self::RawTcp(t) => t.send(message),
@ -51,7 +51,7 @@ impl NetworkConnection {
Self::Wss(w) => w.send(message), Self::Wss(w) => w.send(message),
} }
} }
pub fn recv(&self) -> SystemPinBoxFuture<Result<Vec<u8>, ()>> { pub fn recv(&self) -> SystemPinBoxFuture<Result<Vec<u8>, String>> {
match self { match self {
Self::Dummy(d) => d.recv(), Self::Dummy(d) => d.recv(),
Self::RawTcp(t) => t.recv(), Self::RawTcp(t) => t.recv(),

View File

@ -49,40 +49,58 @@ impl RawTcpNetworkConnection {
ProtocolType::TCP ProtocolType::TCP
} }
pub fn send(&self, message: Vec<u8>) -> SystemPinBoxFuture<Result<(), ()>> { pub fn send(&self, message: Vec<u8>) -> SystemPinBoxFuture<Result<(), String>> {
let inner = self.inner.clone(); let inner = self.inner.clone();
Box::pin(async move { Box::pin(async move {
if message.len() > MAX_MESSAGE_SIZE { if message.len() > MAX_MESSAGE_SIZE {
return Err(()); return Err("sending too large TCP message".to_owned());
} }
let len = message.len() as u16; let len = message.len() as u16;
let header = [b'V', b'L', len as u8, (len >> 8) as u8]; let header = [b'V', b'L', len as u8, (len >> 8) as u8];
let mut inner = inner.lock().await; let mut inner = inner.lock().await;
inner.stream.write_all(&header).await.map_err(drop)?; inner
inner.stream.write_all(&message).await.map_err(drop) .stream
.write_all(&header)
.await
.map_err(map_to_string)
.map_err(logthru_net!())?;
inner
.stream
.write_all(&message)
.await
.map_err(map_to_string)
.map_err(logthru_net!())
}) })
} }
pub fn recv(&self) -> SystemPinBoxFuture<Result<Vec<u8>, ()>> { pub fn recv(&self) -> SystemPinBoxFuture<Result<Vec<u8>, String>> {
let inner = self.inner.clone(); let inner = self.inner.clone();
Box::pin(async move { Box::pin(async move {
let mut header = [0u8; 4]; let mut header = [0u8; 4];
let mut inner = inner.lock().await; let mut inner = inner.lock().await;
inner.stream.read_exact(&mut header).await.map_err(drop)?; inner
.stream
.read_exact(&mut header)
.await
.map_err(|e| format!("TCP recv error: {}", e))?;
if header[0] != b'V' || header[1] != b'L' { if header[0] != b'V' || header[1] != b'L' {
return Err(()); return Err("received invalid TCP frame header".to_owned());
} }
let len = ((header[3] as usize) << 8) | (header[2] as usize); let len = ((header[3] as usize) << 8) | (header[2] as usize);
if len > MAX_MESSAGE_SIZE { if len > MAX_MESSAGE_SIZE {
return Err(()); return Err("received too large TCP frame".to_owned());
} }
let mut out: Vec<u8> = vec![0u8; len]; let mut out: Vec<u8> = vec![0u8; len];
inner.stream.read_exact(&mut out).await.map_err(drop)?; inner
.stream
.read_exact(&mut out)
.await
.map_err(map_to_string)?;
Ok(out) Ok(out)
}) })
} }
@ -130,7 +148,8 @@ impl RawTcpProtocolHandler {
let peeklen = stream let peeklen = stream
.peek(&mut peekbuf) .peek(&mut peekbuf)
.await .await
.map_err(|e| format!("could not peek tcp stream: {}", e))?; .map_err(map_to_string)
.map_err(logthru_net!("could not peek tcp stream"))?;
assert_eq!(peeklen, PEEK_DETECT_LEN); assert_eq!(peeklen, PEEK_DETECT_LEN);
let conn = NetworkConnection::RawTcp(RawTcpNetworkConnection::new(stream)); let conn = NetworkConnection::RawTcp(RawTcpNetworkConnection::new(stream));
@ -159,20 +178,21 @@ impl RawTcpProtocolHandler {
// for hole-punch compatibility // for hole-punch compatibility
let domain = Domain::for_address(remote_socket_addr); let domain = Domain::for_address(remote_socket_addr);
let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP)) let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))
.map_err(|e| format!("could not create tcp socket: {}", e))?; .map_err(map_to_string)
.map_err(logthru_net!())?;
if let Err(e) = socket.set_linger(None) { if let Err(e) = socket.set_linger(None) {
warn!("Couldn't set TCP linger: {}", e); log_net!("Couldn't set TCP linger: {}", e);
} }
if let Err(e) = socket.set_nodelay(true) { if let Err(e) = socket.set_nodelay(true) {
warn!("Couldn't set TCP nodelay: {}", e); log_net!("Couldn't set TCP nodelay: {}", e);
} }
if let Err(e) = socket.set_reuse_address(true) { if let Err(e) = socket.set_reuse_address(true) {
warn!("Couldn't set reuse address: {}", e); log_net!("Couldn't set reuse address: {}", e);
} }
cfg_if! { cfg_if! {
if #[cfg(unix)] { if #[cfg(unix)] {
if let Err(e) = socket.set_reuse_port(true) { if let Err(e) = socket.set_reuse_port(true) {
warn!("Couldn't set reuse port: {}", e); log_net!("Couldn't set reuse port: {}", e);
} }
} }
} }
@ -181,7 +201,7 @@ impl RawTcpProtocolHandler {
if let Some(some_local_addr) = preferred_local_address { if let Some(some_local_addr) = preferred_local_address {
let socket2_addr = socket2::SockAddr::from(some_local_addr); let socket2_addr = socket2::SockAddr::from(some_local_addr);
if let Err(e) = socket.bind(&socket2_addr) { if let Err(e) = socket.bind(&socket2_addr) {
warn!("failed to bind TCP socket: {}", e); log_net!(error "failed to bind TCP socket: {}", e);
} }
} }
@ -189,14 +209,16 @@ impl RawTcpProtocolHandler {
let remote_socket2_addr = socket2::SockAddr::from(remote_socket_addr); let remote_socket2_addr = socket2::SockAddr::from(remote_socket_addr);
socket socket
.connect(&remote_socket2_addr) .connect(&remote_socket2_addr)
.map_err(|e| format!("couln't connect tcp: {}", e))?; .map_err(map_to_string)
.map_err(logthru_net!("addr={}", remote_socket_addr))?;
let std_stream: std::net::TcpStream = socket.into(); let std_stream: std::net::TcpStream = socket.into();
let ts = TcpStream::from(std_stream); let ts = TcpStream::from(std_stream);
// See what local address we ended up with and turn this into a stream // See what local address we ended up with and turn this into a stream
let local_address = ts let local_address = ts
.local_addr() .local_addr()
.map_err(|e| format!("couldn't get local address for tcp socket: {}", e))?; .map_err(map_to_string)
.map_err(logthru_net!())?;
let ps = AsyncPeekStream::new(ts); let ps = AsyncPeekStream::new(ts);
let peer_addr = PeerAddress::new( let peer_addr = PeerAddress::new(
Address::from_socket_addr(remote_socket_addr).to_canonical(), Address::from_socket_addr(remote_socket_addr).to_canonical(),
@ -215,9 +237,12 @@ impl RawTcpProtocolHandler {
Ok(conn) Ok(conn)
} }
pub async fn send_unbound_message(data: Vec<u8>, socket_addr: SocketAddr) -> Result<(), ()> { pub async fn send_unbound_message(
data: Vec<u8>,
socket_addr: SocketAddr,
) -> Result<(), String> {
if data.len() > MAX_MESSAGE_SIZE { if data.len() > MAX_MESSAGE_SIZE {
return Err(()); return Err("sending too large unbound TCP message".to_owned());
} }
trace!( trace!(
"sending unbound message of length {} to {}", "sending unbound message of length {} to {}",
@ -225,8 +250,10 @@ impl RawTcpProtocolHandler {
socket_addr socket_addr
); );
let mut stream = TcpStream::connect(socket_addr).await.map_err(drop)?; let mut stream = TcpStream::connect(socket_addr)
stream.write_all(&data).await.map_err(drop) .await
.map_err(|e| format!("{}", e))?;
stream.write_all(&data).await.map_err(|e| format!("{}", e))
} }
} }

View File

@ -30,9 +30,9 @@ impl RawUdpProtocolHandler {
} }
} }
pub async fn on_message(&self, data: &[u8], remote_addr: SocketAddr) -> Result<bool, ()> { pub async fn on_message(&self, data: &[u8], remote_addr: SocketAddr) -> Result<bool, String> {
if data.len() > MAX_MESSAGE_SIZE { if data.len() > MAX_MESSAGE_SIZE {
return Err(()); return Err("received too large UDP message".to_owned());
} }
trace!( trace!(
@ -52,7 +52,7 @@ impl RawUdpProtocolHandler {
remote_addr.port(), remote_addr.port(),
ProtocolType::UDP, ProtocolType::UDP,
); );
let local_socket_addr = socket.local_addr().map_err(drop)?; let local_socket_addr = socket.local_addr().map_err(|e| format!("{}", e))?;
network_manager network_manager
.on_recv_envelope( .on_recv_envelope(
data, data,
@ -61,9 +61,9 @@ impl RawUdpProtocolHandler {
.await .await
} }
pub async fn send_message(&self, data: Vec<u8>, socket_addr: SocketAddr) -> Result<(), ()> { pub async fn send_message(&self, data: Vec<u8>, socket_addr: SocketAddr) -> Result<(), String> {
if data.len() > MAX_MESSAGE_SIZE { if data.len() > MAX_MESSAGE_SIZE {
return Err(()); return Err("sending too large UDP message".to_owned());
} }
trace!( trace!(
@ -73,18 +73,25 @@ impl RawUdpProtocolHandler {
); );
let socket = self.inner.lock().socket.clone(); let socket = self.inner.lock().socket.clone();
let len = socket.send_to(&data, socket_addr).await.map_err(drop)?; let len = socket
.send_to(&data, socket_addr)
.await
.map_err(|e| format!("{}", e))?;
if len != data.len() { if len != data.len() {
Err(()) Err("UDP partial send".to_owned())
} else { } else {
Ok(()) Ok(())
} }
} }
pub async fn send_unbound_message(data: Vec<u8>, socket_addr: SocketAddr) -> Result<(), ()> { pub async fn send_unbound_message(
data: Vec<u8>,
socket_addr: SocketAddr,
) -> Result<(), String> {
if data.len() > MAX_MESSAGE_SIZE { if data.len() > MAX_MESSAGE_SIZE {
return Err(()); return Err("sending too large unbound UDP message".to_owned());
} }
xxx continue here
trace!( trace!(
"sending unbound message of length {} to {}", "sending unbound message of length {} to {}",
data.len(), data.len(),
@ -98,10 +105,15 @@ impl RawUdpProtocolHandler {
SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0)), 0) SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0)), 0)
} }
}; };
let socket = UdpSocket::bind(local_socket_addr).await.map_err(drop)?; let socket = UdpSocket::bind(local_socket_addr)
let len = socket.send_to(&data, socket_addr).await.map_err(drop)?; .await
.map_err(|e| format!("{}", e))?;
let len = socket
.send_to(&data, socket_addr)
.await
.map_err(|e| format!("{}", e))?;
if len != data.len() { if len != data.len() {
Err(()) Err("UDP partial unbound send".to_owned())
} else { } else {
Ok(()) Ok(())
} }

View File

@ -87,22 +87,23 @@ where
ProtocolType::WS ProtocolType::WS
} }
} }
pub fn send(&self, message: Vec<u8>) -> SystemPinBoxFuture<Result<(), ()>> {
pub fn send(&self, message: Vec<u8>) -> SystemPinBoxFuture<Result<(), String>> {
let inner = self.inner.clone(); let inner = self.inner.clone();
Box::pin(async move { Box::pin(async move {
if message.len() > MAX_MESSAGE_SIZE { if message.len() > MAX_MESSAGE_SIZE {
return Err(()); return Err("received too large WS message".to_owned());
} }
let mut inner = inner.lock().await; let mut inner = inner.lock().await;
inner inner
.ws_stream .ws_stream
.send(Message::binary(message)) .send(Message::binary(message))
.await .await
.map_err(drop) .map_err(map_to_string)
}) })
} }
pub fn recv(&self) -> SystemPinBoxFuture<Result<Vec<u8>, ()>> { pub fn recv(&self) -> SystemPinBoxFuture<Result<Vec<u8>, String>> {
let inner = self.inner.clone(); let inner = self.inner.clone();
Box::pin(async move { Box::pin(async move {
@ -110,13 +111,18 @@ where
let out = match inner.ws_stream.next().await { let out = match inner.ws_stream.next().await {
Some(Ok(Message::Binary(v))) => v, Some(Ok(Message::Binary(v))) => v,
_ => { Some(Ok(_)) => {
trace!("websocket recv failed"); return Err("Unexpected WS message type".to_owned());
return Err(()); }
Some(Err(e)) => {
return Err(e.to_string());
}
None => {
return Err("WS stream closed".to_owned());
} }
}; };
if out.len() > MAX_MESSAGE_SIZE { if out.len() > MAX_MESSAGE_SIZE {
Err(()) Err("sending too large WS message".to_owned())
} else { } else {
Ok(out) Ok(out)
} }
@ -193,17 +199,15 @@ impl WebsocketProtocolHandler {
&& peekbuf[request_path_len - 1] == b' ')); && peekbuf[request_path_len - 1] == b' '));
if !matches_path { if !matches_path {
trace!("not websocket"); log_net!("not websocket");
return Ok(false); return Ok(false);
} }
trace!("found websocket"); log_net!("found websocket");
let ws_stream = match accept_async(ps).await { let ws_stream = accept_async(ps)
Ok(s) => s, .await
Err(e) => { .map_err(map_to_string)
return Err(format!("failed websockets handshake: {:?}", e)); .map_err(logthru_net!("failed websockets handshake"))?;
}
};
// Wrap the websocket in a NetworkConnection and register it // Wrap the websocket in a NetworkConnection and register it
let protocol_type = if self.inner.tls { let protocol_type = if self.inner.tls {

View File

@ -11,6 +11,11 @@ pub fn get_timestamp() -> u64 {
} }
} }
pub fn get_timestamp_string() -> String {
let dt = chrono::Utc::now();
dt.time().format("%H:%M:%S.3f").to_string()
}
pub fn random_bytes(dest: &mut [u8]) -> Result<(), String> { pub fn random_bytes(dest: &mut [u8]) -> Result<(), String> {
let mut rng = rand::thread_rng(); let mut rng = rand::thread_rng();
rng.try_fill_bytes(dest).map_err(|err| format!("{:?}", err)) rng.try_fill_bytes(dest).map_err(|err| format!("{:?}", err))

View File

@ -33,13 +33,13 @@ impl NetworkConnection {
Self::WS(w) => w.protocol_type(), Self::WS(w) => w.protocol_type(),
} }
} }
pub fn send(&self, message: Vec<u8>) -> SystemPinBoxFuture<Result<(), ()>> { pub fn send(&self, message: Vec<u8>) -> SystemPinBoxFuture<Result<(), String>> {
match self { match self {
Self::Dummy(d) => d.send(message), Self::Dummy(d) => d.send(message),
Self::WS(w) => w.send(message), Self::WS(w) => w.send(message),
} }
} }
pub fn recv(&self) -> SystemPinBoxFuture<Result<Vec<u8>, ()>> { pub fn recv(&self) -> SystemPinBoxFuture<Result<Vec<u8>, String>> {
match self { match self {
Self::Dummy(d) => d.recv(), Self::Dummy(d) => d.recv(),
Self::WS(w) => w.recv(), Self::WS(w) => w.recv(),

View File

@ -52,27 +52,32 @@ impl WebsocketNetworkConnection {
ProtocolType::WS ProtocolType::WS
} }
} }
pub fn send(&self, message: Vec<u8>) -> SystemPinBoxFuture<Result<(), ()>> { pub fn send(&self, message: Vec<u8>) -> SystemPinBoxFuture<Result<(), String>> {
let inner = self.inner.clone(); let inner = self.inner.clone();
Box::pin(async move { Box::pin(async move {
if message.len() > MAX_MESSAGE_SIZE { if message.len() > MAX_MESSAGE_SIZE {
return Err(()); return Err("sending too large WS message".to_owned());
} }
inner.lock().ws.send_with_u8_array(&message).map_err(drop) inner.lock().ws.send_with_u8_array(&message).map_err(map_to_string)
}) })
} }
pub fn recv(&self) -> SystemPinBoxFuture<Result<Vec<u8>, ()>> { pub fn recv(&self) -> SystemPinBoxFuture<Result<Vec<u8>, String)>> {
let inner = self.inner.clone(); let inner = self.inner.clone();
Box::pin(async move { Box::pin(async move {
let out = match inner.lock().ws_stream.next().await { let out = match inner.lock().ws_stream.next().await {
Some(WsMessage::Binary(v)) => v, Some(WsMessage::Binary(v)) => v,
_ => { Some(_) => {
trace!("websocket recv failed"); return Err("Unexpected WS message type".to_owned());
return Err(()); }
Some(Err(e)) => {
return Err(|e| e.to_string());
}
None => {
return Err("WS stream closed".to_owned());
} }
}; };
if out.len() > MAX_MESSAGE_SIZE { if out.len() > MAX_MESSAGE_SIZE {
Err(()) Err("sending too large WS message".to_owned())
} else { } else {
Ok(out) Ok(out)
} }

View File

@ -28,6 +28,18 @@ pub fn get_timestamp() -> u64 {
} }
} }
pub fn get_timestamp_string() -> String {
let date = Date::now();
let hours = Date::get_utc_hours(date);
let minutes = Date::get_utc_minutes(date);
let seconds = Date::get_utc_seconds(date);
let milliseconds = Date::get_utc_milliseconds(date);
format!(
"{:02}:{:02}:{:02}.{}",
hours, minutes, seconds, milliseconds
)
}
pub fn random_bytes(dest: &mut [u8]) -> Result<(), String> { pub fn random_bytes(dest: &mut [u8]) -> Result<(), String> {
let len = dest.len(); let len = dest.len();
let u32len = len / 4; let u32len = len / 4;

View File

@ -275,14 +275,14 @@ impl NetworkManager {
.lock() .lock()
.connection_add_channel_tx .connection_add_channel_tx
.as_ref() .as_ref()
.ok_or_else(|| "connection channel isn't open yet".to_owned())? .ok_or_else(fn_string!("connection channel isn't open yet"))?
.clone(); .clone();
let this = self.clone(); let this = self.clone();
let receiver_loop_future = Self::process_connection(this, descriptor, conn); let receiver_loop_future = Self::process_connection(this, descriptor, conn);
Ok(tx tx.try_send(receiver_loop_future)
.try_send(receiver_loop_future)
.await .await
.map_err(|_| "connection loop stopped".to_owned())?) .map_err(map_to_string)
.map_err(logthru_net!(error "failed to start receiver loop"))
} }
// Connection receiver loop // Connection receiver loop
@ -299,7 +299,7 @@ impl NetworkManager {
{ {
Ok(e) => e, Ok(e) => e,
Err(err) => { Err(err) => {
error!("{}", err); error!(target: "net", "{}", err);
return; return;
} }
}; };
@ -323,7 +323,10 @@ impl NetworkManager {
}; };
match this.on_recv_envelope(message.as_slice(), &descriptor).await { match this.on_recv_envelope(message.as_slice(), &descriptor).await {
Ok(_) => (), Ok(_) => (),
Err(_) => break, Err(e) => {
error!("{}", e);
break;
}
}; };
} }
@ -527,19 +530,16 @@ impl NetworkManager {
&self, &self,
data: &[u8], data: &[u8],
descriptor: &ConnectionDescriptor, descriptor: &ConnectionDescriptor,
) -> Result<bool, ()> { ) -> Result<bool, String> {
// Is this an out-of-band receipt instead of an envelope? // Is this an out-of-band receipt instead of an envelope?
if data[0..4] == *RECEIPT_MAGIC { if data[0..4] == *RECEIPT_MAGIC {
self.process_receipt(data).await.map_err(|s| { self.process_receipt(data).await?;
trace!("receipt failed to process: {}", s);
})?;
return Ok(true); return Ok(true);
} }
// Decode envelope header // Decode envelope header
let envelope = Envelope::from_data(data).map_err(|_| { let envelope =
trace!("envelope failed to decode"); Envelope::from_data(data).map_err(|_| "envelope failed to decode".to_owned())?;
})?;
// Get routing table and rpc processor // Get routing table and rpc processor
let (routing_table, lease_manager, rpc) = { let (routing_table, lease_manager, rpc) = {
@ -560,22 +560,22 @@ impl NetworkManager {
if !lease_manager.server_has_valid_relay_lease(&recipient_id) if !lease_manager.server_has_valid_relay_lease(&recipient_id)
&& !lease_manager.server_has_valid_relay_lease(&sender_id) && !lease_manager.server_has_valid_relay_lease(&sender_id)
{ {
trace!("received envelope not intended for this node"); return Err("received envelope not intended for this node".to_owned());
return Err(());
} }
// Resolve the node to send this to // Resolve the node to send this to
let relay_nr = rpc.resolve_node(recipient_id).await.map_err(|_| { let relay_nr = rpc.resolve_node(recipient_id).await.map_err(|e| {
trace!("failed to resolve recipient node for relay, dropping packet..."); format!(
"failed to resolve recipient node for relay, dropping packet...: {:?}",
e
)
})?; })?;
// Re-send the packet to the leased node // Re-send the packet to the leased node
self.net() self.net()
.send_data(relay_nr, data.to_vec()) .send_data(relay_nr, data.to_vec())
.await .await
.map_err(|_| { .map_err(|e| format!("failed to forward envelope: {}", e))?;
trace!("failed to forward envelope");
})?;
// Inform caller that we dealt with the envelope, but did not process it locally // Inform caller that we dealt with the envelope, but did not process it locally
return Ok(false); return Ok(false);
} }
@ -585,7 +585,9 @@ impl NetworkManager {
// Decrypt the envelope body // Decrypt the envelope body
// xxx: punish nodes that send messages that fail to decrypt eventually // xxx: punish nodes that send messages that fail to decrypt eventually
let body = envelope.decrypt_body(self.crypto(), data, &node_id_secret)?; let body = envelope
.decrypt_body(self.crypto(), data, &node_id_secret)
.map_err(|_| "failed to decrypt envelope body".to_owned())?;
// Get timestamp range // Get timestamp range
let (tsbehind, tsahead) = { let (tsbehind, tsahead) = {
@ -601,20 +603,18 @@ impl NetworkManager {
let ets = envelope.get_timestamp(); let ets = envelope.get_timestamp();
if let Some(tsbehind) = tsbehind { if let Some(tsbehind) = tsbehind {
if tsbehind > 0 && (ts > ets && ts - ets > tsbehind) { if tsbehind > 0 && (ts > ets && ts - ets > tsbehind) {
trace!( return Err(format!(
"envelope time was too far in the past: {}ms ", "envelope time was too far in the past: {}ms ",
timestamp_to_secs(ts - ets) * 1000f64 timestamp_to_secs(ts - ets) * 1000f64
); ));
return Err(());
} }
} }
if let Some(tsahead) = tsahead { if let Some(tsahead) = tsahead {
if tsahead > 0 && (ts < ets && ets - ts > tsahead) { if tsahead > 0 && (ts < ets && ets - ts > tsahead) {
trace!( return Err(format!(
"envelope time was too far in the future: {}ms", "envelope time was too far in the future: {}ms",
timestamp_to_secs(ets - ts) * 1000f64 timestamp_to_secs(ets - ts) * 1000f64
); ));
return Err(());
} }
} }
@ -625,9 +625,7 @@ impl NetworkManager {
descriptor.clone(), descriptor.clone(),
ts, ts,
) )
.map_err(|_| { .map_err(|e| format!("node id registration failed: {}", e))?;
trace!("node id registration failed");
})?;
source_noderef.operate(|e| e.set_min_max_version(envelope.get_min_max_version())); source_noderef.operate(|e| e.set_min_max_version(envelope.get_min_max_version()));
// xxx: deal with spoofing and flooding here? // xxx: deal with spoofing and flooding here?
@ -635,9 +633,7 @@ impl NetworkManager {
// Pass message to RPC system // Pass message to RPC system
rpc.enqueue_message(envelope, body, source_noderef) rpc.enqueue_message(envelope, body, source_noderef)
.await .await
.map_err(|_| { .map_err(|e| format!("enqueing rpc message failed: {}", e))?;
trace!("enqueing rpc message failed");
})?;
// Inform caller that we dealt with the envelope locally // Inform caller that we dealt with the envelope locally
Ok(true) Ok(true)

View File

@ -0,0 +1,282 @@
use super::*;
#[derive(Debug, Clone, PartialOrd, PartialEq, Eq, Ord)]
pub enum RPCError {
Timeout,
InvalidFormat,
Unimplemented(String),
Protocol(String),
Internal(String),
}
pub fn rpc_error_internal<T: AsRef<str>>(x: T) -> RPCError {
error!("RPCError Internal: {}", x.as_ref());
RPCError::Internal(x.as_ref().to_owned())
}
pub fn rpc_error_capnp_error(e: capnp::Error) -> RPCError {
error!("RPCError Protocol: {}", &e.description);
RPCError::Protocol(e.description)
}
pub fn rpc_error_capnp_notinschema(e: capnp::NotInSchema) -> RPCError {
error!("RPCError Protocol: not in schema: {}", &e.0);
RPCError::Protocol(format!("not in schema: {}", &e.0))
}
pub fn rpc_error_unimplemented<T: AsRef<str>>(x: T) -> RPCError {
error!("RPCError Unimplemented: {}", x.as_ref());
RPCError::Unimplemented(x.as_ref().to_owned())
}
impl fmt::Display for RPCError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
RPCError::Timeout => write!(f, "[RPCError: Timeout]"),
RPCError::InvalidFormat => write!(f, "[RPCError: InvalidFormat]"),
RPCError::Unimplemented(s) => write!(f, "[RPCError: Unimplemented({})]", s),
RPCError::Protocol(s) => write!(f, "[RPCError: Protocol({})]", s),
RPCError::Internal(s) => write!(f, "[RPCError: Internal({})]", s),
}
}
}
#[macro_export]
macro_rules! map_error_internal {
($x:expr) => {
|_| rpc_error_internal($x)
};
}
#[macro_export]
macro_rules! map_error_string {
() => {
|s| rpc_error_internal(&s)
};
}
#[macro_export]
macro_rules! map_error_capnp_error {
() => {
|e| rpc_error_capnp_error(e)
};
}
#[macro_export]
macro_rules! map_error_capnp_notinschema {
() => {
|e| rpc_error_capnp_notinschema(e)
};
}
#[macro_export]
macro_rules! map_error_panic {
() => {
|_| panic!("oops")
};
}
impl RPCProcessor {
pub(super) fn get_rpc_request_debug_info<T: capnp::message::ReaderSegments>(
&self,
dest: &Destination,
message: &capnp::message::Reader<T>,
safety_route_spec: &Option<&SafetyRouteSpec>,
) -> String {
format!(
"REQ->{:?}{} {}",
dest,
match safety_route_spec {
None => "".to_owned(),
Some(srs) => format!("[{:?}]", srs),
},
self.get_rpc_message_debug_info(message)
)
}
pub(super) fn get_rpc_reply_debug_info<T: capnp::message::ReaderSegments>(
&self,
request_rpcreader: &RPCMessageReader,
reply_msg: &capnp::message::Reader<T>,
safety_route_spec: &Option<&SafetyRouteSpec>,
) -> String {
let request_operation = match request_rpcreader
.reader
.get_root::<veilid_capnp::operation::Reader>()
{
Ok(v) => v,
Err(e) => {
return format!("invalid operation: {}", e);
}
};
let respond_to = match request_operation.get_respond_to().which() {
Ok(v) => v,
Err(e) => {
return format!("(respond_to not in schema: {:?})", e);
}
};
let respond_to_str = match respond_to {
veilid_capnp::operation::respond_to::None(_) => "(None)".to_owned(),
veilid_capnp::operation::respond_to::Sender(_) => "Sender".to_owned(),
veilid_capnp::operation::respond_to::PrivateRoute(pr) => {
let pr_reader = match pr {
Ok(prr) => prr,
Err(e) => {
return e.to_string();
}
};
let private_route = match decode_private_route(&pr_reader) {
Ok(pr) => pr,
Err(e) => {
return e.to_string();
}
};
format!("[PR:{:?}]", private_route)
}
};
format!(
"REPLY->{:?}{} {}",
respond_to_str,
match safety_route_spec {
None => "".to_owned(),
Some(srs) => format!("[SR:{:?}]", srs),
},
self.get_rpc_message_debug_info(reply_msg)
)
}
pub(super) fn get_rpc_message_debug_info<T: capnp::message::ReaderSegments>(
&self,
message: &capnp::message::Reader<T>,
) -> String {
let operation = match message.get_root::<veilid_capnp::operation::Reader>() {
Ok(v) => v,
Err(e) => {
return format!("invalid operation: {}", e);
}
};
let op_id = operation.get_op_id();
let detail = match operation.get_detail().which() {
Ok(v) => v,
Err(e) => {
return format!("(operation detail not in schema: {})", e);
}
};
format!(
"#{} {}",
op_id,
self.get_rpc_operation_detail_debug_info(&detail)
)
}
#[allow(clippy::useless_format)]
pub(super) fn get_rpc_operation_detail_debug_info(
&self,
detail: &veilid_capnp::operation::detail::WhichReader,
) -> String {
match detail {
veilid_capnp::operation::detail::InfoQ(_) => {
format!("InfoQ")
}
veilid_capnp::operation::detail::InfoA(_) => {
format!("InfoA")
}
veilid_capnp::operation::detail::ValidateDialInfo(_) => {
format!("ValidateDialInfo")
}
veilid_capnp::operation::detail::FindNodeQ(d) => {
let fnqr = match d {
Ok(fnqr) => fnqr,
Err(e) => {
return format!("(invalid detail: {})", e);
}
};
let nidr = match fnqr.get_node_id() {
Ok(nidr) => nidr,
Err(e) => {
return format!("(invalid node id: {})", e);
}
};
let pir = match fnqr.get_peer_info() {
Ok(pir) => pir,
Err(e) => {
return format!("(invalid peer_info: {})", e);
}
};
let node_id = decode_public_key(&nidr);
let peer_info = match decode_peer_info(&pir) {
Ok(pi) => pi,
Err(e) => {
return e.to_string();
}
};
format!(
"FindNodeQ: node_id={} peer_info={:?}",
node_id.encode(),
peer_info
)
}
veilid_capnp::operation::detail::FindNodeA(_) => {
format!("FindNodeA")
}
veilid_capnp::operation::detail::Route(_) => {
format!("Route")
}
veilid_capnp::operation::detail::GetValueQ(_) => {
format!("GetValueQ")
}
veilid_capnp::operation::detail::GetValueA(_) => {
format!("GetValueA")
}
veilid_capnp::operation::detail::SetValueQ(_) => {
format!("SetValueQ")
}
veilid_capnp::operation::detail::SetValueA(_) => {
format!("SetValueA")
}
veilid_capnp::operation::detail::WatchValueQ(_) => {
format!("WatchValueQ")
}
veilid_capnp::operation::detail::WatchValueA(_) => {
format!("WatchValueA")
}
veilid_capnp::operation::detail::ValueChanged(_) => {
format!("ValueChanged")
}
veilid_capnp::operation::detail::SupplyBlockQ(_) => {
format!("SupplyBlockQ")
}
veilid_capnp::operation::detail::SupplyBlockA(_) => {
format!("SupplyBlockA")
}
veilid_capnp::operation::detail::FindBlockQ(_) => {
format!("FindBlockQ")
}
veilid_capnp::operation::detail::FindBlockA(_) => {
format!("FindBlockA")
}
veilid_capnp::operation::detail::SignalQ(_) => {
format!("SignalQ")
}
veilid_capnp::operation::detail::SignalA(_) => {
format!("SignalA")
}
veilid_capnp::operation::detail::ReturnReceipt(_) => {
format!("ReturnReceipt")
}
veilid_capnp::operation::detail::StartTunnelQ(_) => {
format!("StartTunnelQ")
}
veilid_capnp::operation::detail::StartTunnelA(_) => {
format!("StartTunnelA")
}
veilid_capnp::operation::detail::CompleteTunnelQ(_) => {
format!("CompleteTunnelQ")
}
veilid_capnp::operation::detail::CompleteTunnelA(_) => {
format!("CompleteTunnelA")
}
veilid_capnp::operation::detail::CancelTunnelQ(_) => {
format!("CancelTunnelQ")
}
veilid_capnp::operation::detail::CancelTunnelA(_) => {
format!("CancelTunnelA")
}
}
}
}

View File

@ -1,4 +1,9 @@
mod coders; mod coders;
mod debug;
mod private_route;
pub use debug::*;
pub use private_route::*;
use crate::dht::*; use crate::dht::*;
use crate::intf::utils::channel::*; use crate::intf::utils::channel::*;
@ -18,77 +23,6 @@ use routing_table::*;
type OperationId = u64; type OperationId = u64;
#[derive(Debug, Clone, PartialOrd, PartialEq, Eq, Ord)]
pub enum RPCError {
Timeout,
InvalidFormat,
Unimplemented(String),
Protocol(String),
Internal(String),
}
pub fn rpc_error_internal<T: AsRef<str>>(x: T) -> RPCError {
error!("RPCError Internal: {}", x.as_ref());
RPCError::Internal(x.as_ref().to_owned())
}
pub fn rpc_error_capnp_error(e: capnp::Error) -> RPCError {
error!("RPCError Protocol: {}", &e.description);
RPCError::Protocol(e.description)
}
pub fn rpc_error_capnp_notinschema(e: capnp::NotInSchema) -> RPCError {
error!("RPCError Protocol: not in schema: {}", &e.0);
RPCError::Protocol(format!("not in schema: {}", &e.0))
}
pub fn rpc_error_unimplemented<T: AsRef<str>>(x: T) -> RPCError {
error!("RPCError Unimplemented: {}", x.as_ref());
RPCError::Unimplemented(x.as_ref().to_owned())
}
impl fmt::Display for RPCError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
RPCError::Timeout => write!(f, "[RPCError: Timeout]"),
RPCError::InvalidFormat => write!(f, "[RPCError: InvalidFormat]"),
RPCError::Unimplemented(s) => write!(f, "[RPCError: Unimplemented({})]", s),
RPCError::Protocol(s) => write!(f, "[RPCError: Protocol({})]", s),
RPCError::Internal(s) => write!(f, "[RPCError: Internal({})]", s),
}
}
}
#[macro_export]
macro_rules! map_error_internal {
($x:expr) => {
|_| rpc_error_internal($x)
};
}
#[macro_export]
macro_rules! map_error_string {
() => {
|s| rpc_error_internal(&s)
};
}
#[macro_export]
macro_rules! map_error_capnp_error {
() => {
|e| rpc_error_capnp_error(e)
};
}
#[macro_export]
macro_rules! map_error_capnp_notinschema {
() => {
|e| rpc_error_capnp_notinschema(e)
};
}
#[macro_export]
macro_rules! map_error_panic {
() => {
|_| panic!("oops")
};
}
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum Destination { pub enum Destination {
Direct(NodeRef), Direct(NodeRef),
@ -207,7 +141,7 @@ pub struct RPCProcessorInner {
routing_table: RoutingTable, routing_table: RoutingTable,
node_id: key::DHTKey, node_id: key::DHTKey,
node_id_secret: key::DHTKeySecret, node_id_secret: key::DHTKeySecret,
channel: Option<(Sender<RPCMessage>, Receiver<RPCMessage>)>, send_channel: Option<Sender<RPCMessage>>,
timeout: u64, timeout: u64,
max_route_hop_count: usize, max_route_hop_count: usize,
waiting_rpc_table: BTreeMap<OperationId, EventualValue<RPCMessageReader>>, waiting_rpc_table: BTreeMap<OperationId, EventualValue<RPCMessageReader>>,
@ -228,7 +162,7 @@ impl RPCProcessor {
routing_table: network_manager.routing_table(), routing_table: network_manager.routing_table(),
node_id: key::DHTKey::default(), node_id: key::DHTKey::default(),
node_id_secret: key::DHTKeySecret::default(), node_id_secret: key::DHTKeySecret::default(),
channel: None, send_channel: None,
timeout: 10000000, timeout: 10000000,
max_route_hop_count: 7, max_route_hop_count: 7,
waiting_rpc_table: BTreeMap::new(), waiting_rpc_table: BTreeMap::new(),
@ -327,197 +261,6 @@ impl RPCProcessor {
Ok(nr) Ok(nr)
} }
//////////////////////////////////////////////////////////////////////
fn new_stub_private_route<'a, T>(
&self,
dest_node_id: key::DHTKey,
builder: &'a mut ::capnp::message::Builder<T>,
) -> Result<veilid_capnp::private_route::Reader<'a>, RPCError>
where
T: capnp::message::Allocator + 'a,
{
let mut pr = builder.init_root::<veilid_capnp::private_route::Builder>();
let mut pr_pk = pr.reborrow().init_public_key();
encode_public_key(&dest_node_id, &mut pr_pk)?;
pr.set_hop_count(0u8);
// leave firstHop as null
Ok(pr.into_reader())
}
fn encode_safety_route<'a>(
&self,
safety_route: &SafetyRouteSpec,
private_route: veilid_capnp::private_route::Reader<'a>,
builder: &'a mut veilid_capnp::safety_route::Builder<'a>,
) -> Result<(), RPCError> {
// Ensure the total hop count isn't too long for our config
let pr_hopcount = private_route.get_hop_count() as usize;
let sr_hopcount = safety_route.hops.len();
let hopcount = 1 + sr_hopcount + pr_hopcount;
if hopcount > self.inner.lock().max_route_hop_count {
return Err(rpc_error_internal("hop count too long for route"));
}
// Build the safety route
let mut sr_pk = builder.reborrow().init_public_key();
encode_public_key(&safety_route.public_key, &mut sr_pk)?;
builder.set_hop_count(
u8::try_from(sr_hopcount)
.map_err(map_error_internal!("hop count too large for safety route"))?,
);
// Build all the hops in the safety route
let mut hops_builder = builder.reborrow().init_hops();
if sr_hopcount == 0 {
hops_builder
.set_private(private_route)
.map_err(map_error_internal!(
"invalid private route while encoding safety route"
))?;
} else {
// start last blob-to-encrypt data off as private route
let mut blob_data = {
let mut pr_message = ::capnp::message::Builder::new_default();
pr_message
.set_root_canonical(private_route)
.map_err(map_error_internal!(
"invalid private route while encoding safety route"
))?;
let mut blob_data = builder_to_vec(pr_message)?;
// append the private route tag so we know how to decode it later
blob_data.push(1u8);
blob_data
};
// Encode each hop from inside to outside
// skips the outermost hop since that's entering the
// safety route and does not include the dialInfo
// (outer hop is a RouteHopData, not a RouteHop).
// Each loop mutates 'nonce', and 'blob_data'
let mut nonce = Crypto::get_random_nonce();
for h in (1..sr_hopcount).rev() {
// Get blob to encrypt for next hop
blob_data = {
// RouteHop
let mut rh_message = ::capnp::message::Builder::new_default();
let mut rh_builder = rh_message.init_root::<veilid_capnp::route_hop::Builder>();
let mut di_builder = rh_builder.reborrow().init_dial_info();
encode_node_dial_info_single(&safety_route.hops[h].dial_info, &mut di_builder)?;
// RouteHopData
let mut rhd_builder = rh_builder.init_next_hop();
// Add the nonce
let mut rhd_nonce = rhd_builder.reborrow().init_nonce();
encode_nonce(&nonce, &mut rhd_nonce);
// Encrypt the previous blob ENC(nonce, DH(PKhop,SKsr))
let dh_secret = self
.crypto
.cached_dh(
&safety_route.hops[h].dial_info.node_id.key,
&safety_route.secret_key,
)
.map_err(map_error_internal!("dh failed"))?;
let enc_msg_data =
Crypto::encrypt(blob_data.as_slice(), &nonce, &dh_secret, None)
.map_err(map_error_internal!("encryption failed"))?;
rhd_builder.set_blob(enc_msg_data.as_slice());
let mut blob_data = builder_to_vec(rh_message)?;
// append the route hop tag so we know how to decode it later
blob_data.push(0u8);
blob_data
};
// Make another nonce for the next hop
nonce = Crypto::get_random_nonce();
}
// Encode first RouteHopData
let mut first_rhd_builder = hops_builder.init_data();
let mut first_rhd_nonce = first_rhd_builder.reborrow().init_nonce();
encode_nonce(&nonce, &mut first_rhd_nonce);
let dh_secret = self
.crypto
.cached_dh(
&safety_route.hops[0].dial_info.node_id.key,
&safety_route.secret_key,
)
.map_err(map_error_internal!("dh failed"))?;
let enc_msg_data = Crypto::encrypt(blob_data.as_slice(), &nonce, &dh_secret, None)
.map_err(map_error_internal!("encryption failed"))?;
first_rhd_builder.set_blob(enc_msg_data.as_slice());
}
Ok(())
}
// Wrap an operation inside a route
fn wrap_with_route<'a>(
&self,
safety_route: Option<&SafetyRouteSpec>,
private_route: veilid_capnp::private_route::Reader<'a>,
message_data: Vec<u8>,
) -> Result<Vec<u8>, RPCError> {
// Get stuff before we lock inner
let op_id = self.get_next_op_id();
// Encrypt routed operation
let nonce = Crypto::get_random_nonce();
let pr_pk_reader = private_route
.get_public_key()
.map_err(map_error_internal!("public key is invalid"))?;
let pr_pk = decode_public_key(&pr_pk_reader);
let stub_safety_route = SafetyRouteSpec::new();
let sr = safety_route.unwrap_or(&stub_safety_route);
let dh_secret = self
.crypto
.cached_dh(&pr_pk, &sr.secret_key)
.map_err(map_error_internal!("dh failed"))?;
let enc_msg_data = Crypto::encrypt(&message_data, &nonce, &dh_secret, None)
.map_err(map_error_internal!("encryption failed"))?;
// Prepare route operation
let route_msg = {
let mut route_msg = ::capnp::message::Builder::new_default();
let mut route_operation = route_msg.init_root::<veilid_capnp::operation::Builder>();
// Doesn't matter what this op id because there's no answer
// but it shouldn't conflict with any other op id either
route_operation.set_op_id(op_id);
// Answers don't get a 'respond'
let mut respond_to = route_operation.reborrow().init_respond_to();
respond_to.set_none(());
// Set up 'route' operation
let mut route = route_operation.reborrow().init_detail().init_route();
// Set the safety route we've constructed
let mut msg_sr = route.reborrow().init_safety_route();
self.encode_safety_route(sr, private_route, &mut msg_sr)?;
// Put in the encrypted operation we're routing
let mut msg_operation = route.init_operation();
msg_operation.reborrow().init_signatures(0);
let mut route_nonce = msg_operation.reborrow().init_nonce();
encode_nonce(&nonce, &mut route_nonce);
let data = msg_operation.reborrow().init_data(
enc_msg_data
.len()
.try_into()
.map_err(map_error_internal!("data too large"))?,
);
data.copy_from_slice(enc_msg_data.as_slice());
route_msg
};
// Convert message to bytes and return it
let out = builder_to_vec(route_msg)?;
Ok(out)
}
// set up wait for reply // set up wait for reply
fn add_op_id_waiter(&self, op_id: OperationId) -> EventualValue<RPCMessageReader> { fn add_op_id_waiter(&self, op_id: OperationId) -> EventualValue<RPCMessageReader> {
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
@ -619,6 +362,8 @@ impl RPCProcessor {
message: capnp::message::Reader<T>, message: capnp::message::Reader<T>,
safety_route_spec: Option<&SafetyRouteSpec>, safety_route_spec: Option<&SafetyRouteSpec>,
) -> Result<Option<WaitableReply>, RPCError> { ) -> Result<Option<WaitableReply>, RPCError> {
log_rpc!(self.get_rpc_request_debug_info(&dest, &message, &safety_route_spec));
let (op_id, wants_answer, is_ping) = { let (op_id, wants_answer, is_ping) = {
let operation = message let operation = message
.get_root::<veilid_capnp::operation::Reader>() .get_root::<veilid_capnp::operation::Reader>()
@ -626,6 +371,7 @@ impl RPCProcessor {
let op_id = operation.get_op_id(); let op_id = operation.get_op_id();
let wants_answer = self.wants_answer(&operation)?; let wants_answer = self.wants_answer(&operation)?;
let is_ping = operation.get_detail().has_info_q(); let is_ping = operation.get_detail().has_info_q();
(op_id, wants_answer, is_ping) (op_id, wants_answer, is_ping)
}; };
@ -727,7 +473,9 @@ impl RPCProcessor {
let node_ref = match out_noderef { let node_ref = match out_noderef {
None => { None => {
// resolve node // resolve node
self.resolve_node(out_node_id).await? self.resolve_node(out_node_id)
.await
.map_err(logthru_rpc!(error))?
} }
Some(nr) => { Some(nr) => {
// got the node in the routing table already // got the node in the routing table already
@ -748,6 +496,7 @@ impl RPCProcessor {
.network_manager() .network_manager()
.send_envelope(node_ref.clone(), out) .send_envelope(node_ref.clone(), out)
.await .await
.map_err(logthru_rpc!(error))
.map_err(RPCError::Internal) .map_err(RPCError::Internal)
{ {
// Make sure to clean up op id waiter in case of error // Make sure to clean up op id waiter in case of error
@ -792,6 +541,8 @@ impl RPCProcessor {
reply_msg: capnp::message::Reader<T>, reply_msg: capnp::message::Reader<T>,
safety_route_spec: Option<&SafetyRouteSpec>, safety_route_spec: Option<&SafetyRouteSpec>,
) -> Result<(), RPCError> { ) -> Result<(), RPCError> {
log_rpc!(self.get_rpc_reply_debug_info(&request_rpcreader, &reply_msg, &safety_route_spec));
// //
let out_node_id; let out_node_id;
let mut out_noderef: Option<NodeRef> = None; let mut out_noderef: Option<NodeRef> = None;
@ -821,7 +572,7 @@ impl RPCProcessor {
veilid_capnp::operation::respond_to::None(_) => { veilid_capnp::operation::respond_to::None(_) => {
// Do not respond // Do not respond
// -------------- // --------------
return Ok(()); return Err(rpc_error_internal("no response requested"));
} }
veilid_capnp::operation::respond_to::Sender(_) => { veilid_capnp::operation::respond_to::Sender(_) => {
// Respond to envelope source node, possibly through a relay if the request arrived that way // Respond to envelope source node, possibly through a relay if the request arrived that way
@ -1329,7 +1080,6 @@ impl RPCProcessor {
} }
////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////
async fn process_rpc_message_version_0(&self, msg: RPCMessage) -> Result<(), RPCError> { async fn process_rpc_message_version_0(&self, msg: RPCMessage) -> Result<(), RPCError> {
let reader = capnp::message::Reader::new(msg.data, Default::default()); let reader = capnp::message::Reader::new(msg.data, Default::default());
let rpcreader = RPCMessageReader { let rpcreader = RPCMessageReader {
@ -1442,9 +1192,10 @@ impl RPCProcessor {
async fn rpc_worker(self, receiver: Receiver<RPCMessage>) { async fn rpc_worker(self, receiver: Receiver<RPCMessage>) {
while let Ok(msg) = receiver.recv().await { while let Ok(msg) = receiver.recv().await {
if let Err(e) = self.process_rpc_message(msg).await { let _ = self
error!("Couldn't process rpc message: {}", e); .process_rpc_message(msg)
} .await
.map_err(logthru_rpc!("couldn't process rpc message"));
} }
} }
@ -1479,7 +1230,7 @@ impl RPCProcessor {
inner.timeout = timeout; inner.timeout = timeout;
inner.max_route_hop_count = max_route_hop_count; inner.max_route_hop_count = max_route_hop_count;
let channel = channel(queue_size as usize); let channel = channel(queue_size as usize);
inner.channel = Some(channel.clone()); inner.send_channel = Some(channel.0.clone());
// spin up N workers // spin up N workers
trace!("Spinning up {} RPC workers", concurrency); trace!("Spinning up {} RPC workers", concurrency);
@ -1502,8 +1253,7 @@ impl RPCProcessor {
envelope: envelope::Envelope, envelope: envelope::Envelope,
body: Vec<u8>, body: Vec<u8>,
peer_noderef: NodeRef, peer_noderef: NodeRef,
) -> Result<(), ()> { ) -> Result<(), String> {
trace!("enqueue_message: body len = {}", body.len());
let msg = RPCMessage { let msg = RPCMessage {
header: RPCMessageHeader { header: RPCMessageHeader {
timestamp: get_timestamp(), timestamp: get_timestamp(),
@ -1515,9 +1265,12 @@ impl RPCProcessor {
}; };
let send_channel = { let send_channel = {
let inner = self.inner.lock(); let inner = self.inner.lock();
inner.channel.as_ref().unwrap().0.clone() inner.send_channel.as_ref().unwrap().clone()
}; };
send_channel.try_send(msg).await.map_err(drop)?; send_channel
.try_send(msg)
.await
.map_err(|e| format!("failed to enqueue received RPC message: {:?}", e))?;
Ok(()) Ok(())
} }

View File

@ -0,0 +1,194 @@
use super::*;
impl RPCProcessor {
//////////////////////////////////////////////////////////////////////
pub(super) fn new_stub_private_route<'a, T>(
&self,
dest_node_id: key::DHTKey,
builder: &'a mut ::capnp::message::Builder<T>,
) -> Result<veilid_capnp::private_route::Reader<'a>, RPCError>
where
T: capnp::message::Allocator + 'a,
{
let mut pr = builder.init_root::<veilid_capnp::private_route::Builder>();
let mut pr_pk = pr.reborrow().init_public_key();
encode_public_key(&dest_node_id, &mut pr_pk)?;
pr.set_hop_count(0u8);
// leave firstHop as null
Ok(pr.into_reader())
}
fn encode_safety_route<'a>(
&self,
safety_route: &SafetyRouteSpec,
private_route: veilid_capnp::private_route::Reader<'a>,
builder: &'a mut veilid_capnp::safety_route::Builder<'a>,
) -> Result<(), RPCError> {
// Ensure the total hop count isn't too long for our config
let pr_hopcount = private_route.get_hop_count() as usize;
let sr_hopcount = safety_route.hops.len();
let hopcount = 1 + sr_hopcount + pr_hopcount;
if hopcount > self.inner.lock().max_route_hop_count {
return Err(rpc_error_internal("hop count too long for route"));
}
// Build the safety route
let mut sr_pk = builder.reborrow().init_public_key();
encode_public_key(&safety_route.public_key, &mut sr_pk)?;
builder.set_hop_count(
u8::try_from(sr_hopcount)
.map_err(map_error_internal!("hop count too large for safety route"))?,
);
// Build all the hops in the safety route
let mut hops_builder = builder.reborrow().init_hops();
if sr_hopcount == 0 {
hops_builder
.set_private(private_route)
.map_err(map_error_internal!(
"invalid private route while encoding safety route"
))?;
} else {
// start last blob-to-encrypt data off as private route
let mut blob_data = {
let mut pr_message = ::capnp::message::Builder::new_default();
pr_message
.set_root_canonical(private_route)
.map_err(map_error_internal!(
"invalid private route while encoding safety route"
))?;
let mut blob_data = builder_to_vec(pr_message)?;
// append the private route tag so we know how to decode it later
blob_data.push(1u8);
blob_data
};
// Encode each hop from inside to outside
// skips the outermost hop since that's entering the
// safety route and does not include the dialInfo
// (outer hop is a RouteHopData, not a RouteHop).
// Each loop mutates 'nonce', and 'blob_data'
let mut nonce = Crypto::get_random_nonce();
for h in (1..sr_hopcount).rev() {
// Get blob to encrypt for next hop
blob_data = {
// RouteHop
let mut rh_message = ::capnp::message::Builder::new_default();
let mut rh_builder = rh_message.init_root::<veilid_capnp::route_hop::Builder>();
let mut di_builder = rh_builder.reborrow().init_dial_info();
encode_node_dial_info_single(&safety_route.hops[h].dial_info, &mut di_builder)?;
// RouteHopData
let mut rhd_builder = rh_builder.init_next_hop();
// Add the nonce
let mut rhd_nonce = rhd_builder.reborrow().init_nonce();
encode_nonce(&nonce, &mut rhd_nonce);
// Encrypt the previous blob ENC(nonce, DH(PKhop,SKsr))
let dh_secret = self
.crypto
.cached_dh(
&safety_route.hops[h].dial_info.node_id.key,
&safety_route.secret_key,
)
.map_err(map_error_internal!("dh failed"))?;
let enc_msg_data =
Crypto::encrypt(blob_data.as_slice(), &nonce, &dh_secret, None)
.map_err(map_error_internal!("encryption failed"))?;
rhd_builder.set_blob(enc_msg_data.as_slice());
let mut blob_data = builder_to_vec(rh_message)?;
// append the route hop tag so we know how to decode it later
blob_data.push(0u8);
blob_data
};
// Make another nonce for the next hop
nonce = Crypto::get_random_nonce();
}
// Encode first RouteHopData
let mut first_rhd_builder = hops_builder.init_data();
let mut first_rhd_nonce = first_rhd_builder.reborrow().init_nonce();
encode_nonce(&nonce, &mut first_rhd_nonce);
let dh_secret = self
.crypto
.cached_dh(
&safety_route.hops[0].dial_info.node_id.key,
&safety_route.secret_key,
)
.map_err(map_error_internal!("dh failed"))?;
let enc_msg_data = Crypto::encrypt(blob_data.as_slice(), &nonce, &dh_secret, None)
.map_err(map_error_internal!("encryption failed"))?;
first_rhd_builder.set_blob(enc_msg_data.as_slice());
}
Ok(())
}
// Wrap an operation inside a route
pub(super) fn wrap_with_route<'a>(
&self,
safety_route: Option<&SafetyRouteSpec>,
private_route: veilid_capnp::private_route::Reader<'a>,
message_data: Vec<u8>,
) -> Result<Vec<u8>, RPCError> {
// Get stuff before we lock inner
let op_id = self.get_next_op_id();
// Encrypt routed operation
let nonce = Crypto::get_random_nonce();
let pr_pk_reader = private_route
.get_public_key()
.map_err(map_error_internal!("public key is invalid"))?;
let pr_pk = decode_public_key(&pr_pk_reader);
let stub_safety_route = SafetyRouteSpec::new();
let sr = safety_route.unwrap_or(&stub_safety_route);
let dh_secret = self
.crypto
.cached_dh(&pr_pk, &sr.secret_key)
.map_err(map_error_internal!("dh failed"))?;
let enc_msg_data = Crypto::encrypt(&message_data, &nonce, &dh_secret, None)
.map_err(map_error_internal!("encryption failed"))?;
// Prepare route operation
let route_msg = {
let mut route_msg = ::capnp::message::Builder::new_default();
let mut route_operation = route_msg.init_root::<veilid_capnp::operation::Builder>();
// Doesn't matter what this op id because there's no answer
// but it shouldn't conflict with any other op id either
route_operation.set_op_id(op_id);
// Answers don't get a 'respond'
let mut respond_to = route_operation.reborrow().init_respond_to();
respond_to.set_none(());
// Set up 'route' operation
let mut route = route_operation.reborrow().init_detail().init_route();
// Set the safety route we've constructed
let mut msg_sr = route.reborrow().init_safety_route();
self.encode_safety_route(sr, private_route, &mut msg_sr)?;
// Put in the encrypted operation we're routing
let mut msg_operation = route.init_operation();
msg_operation.reborrow().init_signatures(0);
let mut route_nonce = msg_operation.reborrow().init_nonce();
encode_nonce(&nonce, &mut route_nonce);
let data = msg_operation.reborrow().init_data(
enc_msg_data
.len()
.try_into()
.map_err(map_error_internal!("data too large"))?,
);
data.copy_from_slice(enc_msg_data.as_slice());
route_msg
};
// Convert message to bytes and return it
let out = builder_to_vec(route_msg)?;
Ok(out)
}
}

View File

@ -0,0 +1,181 @@
pub fn map_to_string<X: ToString>(arg: X) -> String {
arg.to_string()
}
/*
trait LogThru<T, E> {
fn log_thru<F, O: FnOnce(E) -> F>(self, op: O) -> Result<T, F>;
}
impl<T, E> LogThru<T, E> for Result<T, E> {
fn log_thru<F, O: FnOnce(E) -> F>(self, op: O) -> Result<T, F> {
match self {
Ok(t) => Ok(t),
Err(e) => Err(op(e)),
}
}
}
*/
#[macro_export]
macro_rules! fn_string {
($text:expr) => {
|| $text.to_string()
};
}
#[macro_export]
macro_rules! log_net {
(error $text:expr) => {error!(
target: "net",
"{}",
$text,
)};
(error $fmt:literal, $($arg:expr)+) => {
error!(target:"net", $fmt, $($arg)+);
};
($text:expr) => {trace!(
target: "net",
"{}",
$text,
)};
($fmt:literal, $($arg:expr)+) => {
trace!(target:"net", $fmt, $($arg)+);
}
}
#[macro_export]
macro_rules! log_rpc {
(error $text:expr) => { error!(
target: "rpc",
"{}",
$text,
)};
(error $fmt:literal, $($arg:expr)+) => {
error!(target:"rpc", $fmt, $($arg)+);
};
($text:expr) => {trace!(
target: "rpc",
"{}",
$text,
)};
($fmt:literal, $($arg:expr)+) => {
trace!(target:"rpc", $fmt, $($arg)+);
}
}
#[macro_export]
macro_rules! log_rtab {
(error $text:expr) => { error!(
target: "rtab",
"{}",
$text,
)};
(error $fmt:literal, $($arg:expr)+) => {
error!(target:"rtab", $fmt, $($arg)+);
};
($text:expr) => {trace!(
target: "rtab",
"{}",
$text,
)};
($fmt:literal, $($arg:expr)+) => {
trace!(target:"rtab", $fmt, $($arg)+);
}
}
#[macro_export]
macro_rules! logthru_net {
($($level:ident)?) => {
logthru!($($level)? "net")
};
($($level:ident)? $text:literal) => {
logthru!($($level)? "net", $text)
};
($($level:ident)? $fmt:literal, $($arg:expr)+) => {
logthru!($($level)? "net", $fmt, $($arg)+)
}
}
#[macro_export]
macro_rules! logthru_rpc {
($($level:ident)?) => {
logthru!($($level)? "rpc")
};
($($level:ident)? $text:literal) => {
logthru!($($level)? "rpc", $text)
};
($($level:ident)? $fmt:literal, $($arg:expr)+) => {
logthru!($($level)? "rpc", $fmt, $($arg)+)
}
}
#[macro_export]
macro_rules! logthru_rtab {
($($level:ident)?) => {
logthru!($($level)? "rtab")
};
($($level:ident)? $text:literal) => {
logthru!($($level)? "rtab", $text)
};
($($level:ident)? $fmt:literal, $($arg:expr)+) => {
logthru!($($level)? "rtab", $fmt, $($arg)+)
}
}
#[macro_export]
macro_rules! logthru {
// error
(error $target:literal) => (|e__| {
error!(
target: $target,
"[{}]",
e__,
);
e__
});
(error $target:literal, $text:literal) => (|e__| {
error!(
target: $target,
"[{}] {}",
e__,
$text
);
e__
});
(error $target:literal, $fmt:literal, $($arg:expr)+) => (|e__| {
error!(
target: $target,
concat!("[{}] ", $fmt),
e__,
$($arg)+
);
e__
});
// trace
($target:literal) => (|e__| {
trace!(
target: $target,
"[{}]",
e__,
);
e__
});
($target:literal, $text:literal) => (|e__| {
trace!(
target: $target,
"[{}] {}",
e__,
$text
);
e__
});
($target:literal, $fmt:literal, $($arg:expr)+) => (|e__| {
trace!(
target: $target,
concat!("[{}] ", $fmt),
e__,
$($arg)+
);
e__
})
}

View File

@ -5,6 +5,7 @@ mod eventual_value;
mod eventual_value_clone; mod eventual_value_clone;
mod ip_addr_port; mod ip_addr_port;
mod ip_extra; mod ip_extra;
mod log_thru;
mod single_future; mod single_future;
mod single_shot_eventual; mod single_shot_eventual;
mod split_url; mod split_url;
@ -13,6 +14,7 @@ mod tools;
pub use cfg_if::*; pub use cfg_if::*;
pub use log::*; pub use log::*;
pub use log_thru::*;
pub use parking_lot::*; pub use parking_lot::*;
pub use split_url::*; pub use split_url::*;
pub use static_assertions::*; pub use static_assertions::*;