refactor checkpoint

This commit is contained in:
John Smith
2022-04-16 11:18:54 -04:00
parent ddb74d993f
commit 71f7017235
23 changed files with 977 additions and 615 deletions

View File

@@ -7,8 +7,10 @@ mod node_status;
mod nonce;
mod peer_info;
mod private_safety_route;
mod protocol_set;
mod public_key;
mod sender_info;
mod signal_info;
mod socket_address;
pub use address::*;
@@ -20,6 +22,8 @@ pub use node_status::*;
pub use nonce::*;
pub use peer_info::*;
pub use private_safety_route::*;
pub use protocol_set::*;
pub use public_key::*;
pub use sender_info::*;
pub use signal_info::*;
pub use socket_address::*;

View File

@@ -7,38 +7,34 @@ pub fn encode_node_info(
) -> Result<(), RPCError> {
builder.set_network_class(encode_network_class(node_info.network_class));
let mut ps_builder = builder.reborrow().init_outbound_protocols();
encode_protocol_set(&node_info.outbound_protocols, &mut ps_builder)?;
let mut dil_builder = builder.reborrow().init_dial_info_list(
node_info
.dial_infos
.dial_info_list
.len()
.try_into()
.map_err(map_error_protocol!("too many dial infos in node info"))?,
);
for idx in 0..node_info.dial_infos.len() {
for idx in 0..node_info.dial_info_list.len() {
let mut di_builder = dil_builder.reborrow().get(idx as u32);
encode_dial_info(&node_info.dial_infos[idx], &mut di_builder)?;
encode_dial_info(&node_info.dial_info_list[idx], &mut di_builder)?;
}
let mut rdil_builder = builder.reborrow().init_relay_dial_info_list(
node_info
.relay_dial_infos
.len()
.try_into()
.map_err(map_error_protocol!(
"too many relay dial infos in node info"
))?,
);
for idx in 0..node_info.relay_dial_infos.len() {
let mut rdi_builder = rdil_builder.reborrow().get(idx as u32);
encode_dial_info(&node_info.relay_dial_infos[idx], &mut rdi_builder)?;
if let Some(rpi) = node_info.relay_peer_info {
let mut rpi_builder = builder.reborrow().init_relay_peer_info();
encode_peer_info(&rpi, &mut rpi_builder)?;
}
Ok(())
}
pub fn decode_node_info(reader: &veilid_capnp::node_info::Reader) -> Result<NodeInfo, RPCError> {
pub fn decode_node_info(
reader: &veilid_capnp::node_info::Reader,
allow_relay_peer_info: bool,
) -> Result<NodeInfo, RPCError> {
let network_class = decode_network_class(
reader
.reborrow()
@@ -46,37 +42,47 @@ pub fn decode_node_info(reader: &veilid_capnp::node_info::Reader) -> Result<Node
.map_err(map_error_capnp_notinschema!())?,
);
let outbound_protocols = decode_protocol_set(
&reader
.reborrow()
.get_outbound_protocols()
.map_err(map_error_capnp_notinschema!())?,
)?;
let dil_reader = reader
.reborrow()
.get_dial_info_list()
.map_err(map_error_capnp_error!())?;
let mut dial_infos = Vec::<DialInfo>::with_capacity(
let mut dial_info_list = Vec::<DialInfo>::with_capacity(
dil_reader
.len()
.try_into()
.map_err(map_error_protocol!("too many dial infos"))?,
);
for di in dil_reader.iter() {
dial_infos.push(decode_dial_info(&di)?)
dial_info_list.push(decode_dial_info(&di)?)
}
let rdil_reader = reader
.reborrow()
.get_relay_dial_info_list()
.map_err(map_error_capnp_error!())?;
let mut relay_dial_infos = Vec::<DialInfo>::with_capacity(
rdil_reader
.len()
.try_into()
.map_err(map_error_protocol!("too many relay dial infos"))?,
);
for di in rdil_reader.iter() {
relay_dial_infos.push(decode_dial_info(&di)?)
}
let relay_peer_info = if allow_relay_peer_info {
if reader.has_relay_peer_info() {
Some(Box::new(decode_peer_info(
&reader
.reborrow()
.get_relay_peer_info()
.map_err(map_error_capnp_notinschema!())?,
false,
)?))
} else {
None
}
} else {
None
};
Ok(NodeInfo {
network_class,
dial_infos,
relay_dial_infos,
outbound_protocols,
dial_info_list,
relay_peer_info,
})
}

View File

@@ -14,7 +14,10 @@ pub fn encode_peer_info(
Ok(())
}
pub fn decode_peer_info(reader: &veilid_capnp::peer_info::Reader) -> Result<PeerInfo, RPCError> {
pub fn decode_peer_info(
reader: &veilid_capnp::peer_info::Reader,
allow_relay_peer_info: bool,
) -> Result<PeerInfo, RPCError> {
let nid_reader = reader
.reborrow()
.get_node_id()
@@ -23,7 +26,7 @@ pub fn decode_peer_info(reader: &veilid_capnp::peer_info::Reader) -> Result<Peer
.reborrow()
.get_node_info()
.map_err(map_error_capnp_error!())?;
let node_info = decode_node_info(&ni_reader)?;
let node_info = decode_node_info(&ni_reader, allow_relay_peer_info)?;
Ok(PeerInfo {
node_id: NodeId::new(decode_public_key(&nid_reader)),

View File

@@ -0,0 +1,25 @@
use crate::*;
use rpc_processor::*;
pub fn encode_protocol_set(
protocol_set: &ProtocolSet,
builder: &mut veilid_capnp::protocol_set::Builder,
) -> Result<(), RPCError> {
builder.set_udp(protocol_set.udp);
builder.set_tcp(protocol_set.tcp);
builder.set_ws(protocol_set.ws);
builder.set_wss(protocol_set.wss);
Ok(())
}
pub fn decode_protocol_set(
reader: &veilid_capnp::protocol_set::Reader,
) -> Result<ProtocolSet, RPCError> {
Ok(ProtocolSet {
udp: reader.reborrow().get_udp(),
tcp: reader.reborrow().get_tcp(),
ws: reader.reborrow().get_ws(),
wss: reader.reborrow().get_wss(),
})
}

View File

@@ -0,0 +1,86 @@
use crate::*;
use rpc_processor::*;
pub fn encode_signal_info(
signal_info: &SignalInfo,
builder: &mut veilid_capnp::operation_signal::Builder,
) -> Result<(), RPCError> {
match signal_info {
SignalInfo::HolePunch { receipt, node_info } => {
let mut hp_builder = builder.init_hole_punch();
let rcpt_builder =
hp_builder
.reborrow()
.init_receipt(receipt.len().try_into().map_err(map_error_protocol!(
"invalid receipt length in hole punch signal info"
))?);
rcpt_builder.copy_from_slice(receipt.as_slice());
let mut ni_builder = hp_builder.init_node_info();
encode_node_info(&node_info, &mut ni_builder)?;
}
SignalInfo::ReverseConnect { receipt, node_info } => {
let mut hp_builder = builder.init_reverse_connect();
let rcpt_builder =
hp_builder
.reborrow()
.init_receipt(receipt.len().try_into().map_err(map_error_protocol!(
"invalid receipt length in reverse connect signal info"
))?);
rcpt_builder.copy_from_slice(receipt.as_slice());
let mut ni_builder = hp_builder.init_node_info();
encode_node_info(&node_info, &mut ni_builder)?;
}
}
Ok(())
}
pub fn decode_signal_info(
reader: &veilid_capnp::operation_signal::Reader,
) -> Result<SignalInfo, RPCError> {
Ok(
match reader
.which()
.map_err(map_error_internal!("invalid signal operation"))?
{
veilid_capnp::operation_signal::HolePunch(r) => {
// Extract hole punch reader
let r = match r {
Ok(r) => r,
Err(_) => return Err(rpc_error_internal("invalid hole punch")),
};
let receipt = r
.get_receipt()
.map_err(map_error_protocol!(
"invalid receipt in hole punch signal info"
))?
.to_vec();
let ni_reader = r.get_node_info().map_err(map_error_protocol!(
"invalid node info in hole punch signal info"
))?;
let node_info = decode_node_info(&ni_reader, true)?;
SignalInfo::HolePunch { receipt, node_info }
}
veilid_capnp::operation_signal::ReverseConnect(r) => {
// Extract reverse connect reader
let r = match r {
Ok(r) => r,
Err(_) => return Err(rpc_error_internal("invalid reverse connect")),
};
let receipt = r
.get_receipt()
.map_err(map_error_protocol!(
"invalid receipt in reverse connect signal info"
))?
.to_vec();
let ni_reader = r.get_node_info().map_err(map_error_protocol!(
"invalid node info in reverse connect signal info"
))?;
let node_info = decode_node_info(&ni_reader, true)?;
SignalInfo::ReverseConnect { receipt, node_info }
}
},
)
}

View File

@@ -4,10 +4,12 @@ use super::*;
pub enum RPCError {
Timeout,
InvalidFormat,
Unreachable(DHTKey),
Unimplemented(String),
Protocol(String),
Internal(String),
}
pub fn rpc_error_internal<T: AsRef<str>>(x: T) -> RPCError {
error!("RPCError Internal: {}", x.as_ref());
RPCError::Internal(x.as_ref().to_owned())
@@ -34,6 +36,7 @@ impl fmt::Display for RPCError {
match self {
RPCError::Timeout => write!(f, "[RPCError: Timeout]"),
RPCError::InvalidFormat => write!(f, "[RPCError: InvalidFormat]"),
RPCError::Unreachable(k) => write!(f, "[RPCError: Unreachable({})]", k),
RPCError::Unimplemented(s) => write!(f, "[RPCError: Unimplemented({})]", s),
RPCError::Protocol(s) => write!(f, "[RPCError: Protocol({})]", s),
RPCError::Internal(s) => write!(f, "[RPCError: Internal({})]", s),
@@ -202,37 +205,55 @@ impl RPCProcessor {
}
};
let dil_reader = match fnqr.reborrow().get_dial_info_list() {
Ok(dilr) => dilr,
let sni_reader = match fnqr.reborrow().get_sender_node_info() {
Ok(snir) => snir,
Err(e) => {
return format!("(invalid dial info list: {})", e);
return format!("(invalid sender node info: {})", e);
}
};
let sender_node_info = match decode_node_info(&sni_reader, true) {
Ok(v) => v,
Err(e) => {
return format!("(unable to decode node info: {})", e);
}
};
let mut dial_infos =
Vec::<DialInfo>::with_capacity(match dil_reader.len().try_into() {
Ok(v) => v,
Err(e) => {
return format!("(too many dial infos: {})", e);
}
});
for di in dil_reader.iter() {
dial_infos.push(match decode_dial_info(&di) {
Ok(v) => v,
Err(e) => {
return format!("(unable to decode dial info: {})", e);
}
});
}
let node_id = decode_public_key(&nidr);
format!(
"FindNodeQ: node_id={} dial_infos={:?}",
"FindNodeQ: node_id={} sender_node_info={:#?}",
node_id.encode(),
dial_infos
sender_node_info
)
}
veilid_capnp::operation::detail::FindNodeA(_) => {
format!("FindNodeA")
veilid_capnp::operation::detail::FindNodeA(d) => {
let fnar = match d {
Ok(fnar) => fnar,
Err(e) => {
return format!("(invalid detail: {})", e);
}
};
let p_reader = match fnar.reborrow().get_peers() {
Ok(pr) => pr,
Err(e) => {
return format!("(invalid sender node info: {})", e);
}
};
let mut peers = Vec::<PeerInfo>::with_capacity(match p_reader.len().try_into() {
Ok(v) => v,
Err(e) => return format!("invalid peer count: {}", e),
});
for p in p_reader.iter() {
let peer_info = match decode_peer_info(&p, true) {
Ok(v) => v,
Err(e) => {
return format!("(unable to decode peer info: {})", e);
}
};
peers.push(peer_info);
}
format!("FindNodeA: peers={:#?}", peers)
}
veilid_capnp::operation::detail::Route(_) => {
format!("Route")
@@ -270,11 +291,8 @@ impl RPCProcessor {
veilid_capnp::operation::detail::FindBlockA(_) => {
format!("FindBlockA")
}
veilid_capnp::operation::detail::SignalQ(_) => {
format!("SignalQ")
}
veilid_capnp::operation::detail::SignalA(_) => {
format!("SignalA")
veilid_capnp::operation::detail::Signal(_) => {
format!("Signal")
}
veilid_capnp::operation::detail::ReturnReceipt(_) => {
format!("ReturnReceipt")

View File

@@ -23,8 +23,10 @@ type OperationId = u64;
#[derive(Debug, Clone)]
pub enum Destination {
Direct(NodeRef),
PrivateRoute(PrivateRoute),
Direct(NodeRef), // Can only be sent directly
Normal(NodeRef), // Can be sent via relays as well as directly
Relay(NodeRef, DHTKey), // Can only be sent via a relay
PrivateRoute(PrivateRoute), // Must be encapsulated in a private route
}
#[derive(Debug, Clone)]
@@ -131,7 +133,7 @@ struct WaitableReply {
#[derive(Clone, Debug, Default)]
pub struct InfoAnswer {
pub latency: u64,
pub node_info: NodeInfo,
pub node_status: NodeStatus,
pub sender_info: SenderInfo,
}
@@ -159,7 +161,7 @@ pub struct RPCProcessorInner {
pub struct RPCProcessor {
crypto: Crypto,
config: VeilidConfig,
default_peer_scope: PeerScope,
enable_local_peer_scope: bool,
inner: Arc<Mutex<RPCProcessorInner>>,
}
@@ -181,16 +183,11 @@ impl RPCProcessor {
Self {
crypto: network_manager.crypto(),
config: network_manager.config(),
default_peer_scope: if !network_manager
enable_local_peer_scope: network_manager
.config()
.get()
.network
.enable_local_peer_scope
{
PeerScope::Global
} else {
PeerScope::All
},
.enable_local_peer_scope,
inner: Arc::new(Mutex::new(Self::new_inner(network_manager))),
}
}
@@ -220,16 +217,18 @@ impl RPCProcessor {
fn filter_peer_scope(&self, peer_info: &PeerInfo) -> bool {
// reject attempts to include non-public addresses in results
if self.default_peer_scope == PeerScope::Global {
for di in &peer_info.node_info.dial_infos {
for di in &peer_info.node_info.dial_info_list {
if !di.is_global() {
// non-public address causes rejection
return false;
}
}
for di in &peer_info.node_info.relay_dial_infos {
if !di.is_global() {
// non-public address causes rejection
return false;
if let Some(rpi) = peer_info.node_info.relay_peer_info {
for di in &rpi.node_info.dial_info_list {
if !di.is_global() {
// non-public address causes rejection
return false;
}
}
}
}
@@ -277,9 +276,9 @@ impl RPCProcessor {
// First see if we have the node in our routing table already
if let Some(nr) = routing_table.lookup_node_ref(node_id) {
// ensure we have dial_info for the entry already,
// ensure we have some dial info for the entry already,
// if not, we should do the find_node anyway
if !nr.has_dial_info() {
if !nr.has_any_dial_info() {
return Ok(nr);
}
}
@@ -377,8 +376,8 @@ impl RPCProcessor {
}
}
Ok((rpcreader, _)) => {
// Note that we definitely received this peer info since we got a reply
waitable_reply.node_ref.set_seen_our_dial_info();
// Note that we definitely received this node info since we got a reply
waitable_reply.node_ref.set_seen_our_node_info();
// Reply received
let recv_ts = get_timestamp();
@@ -434,10 +433,20 @@ impl RPCProcessor {
let out;
// To where are we sending the request
match dest {
Destination::Direct(node_ref) => {
match &dest {
Destination::Direct(node_ref) | Destination::Normal(node_ref) => {
// Send to a node without a private route
// --------------------------------------
// Get the actual destination node id, accounting for outbound relaying
let (node_ref, node_id) = if matches!(dest, Destination::Normal(_)) {
self.get_direct_destination(node_ref.clone())?
} else {
let node_id = node_ref.node_id();
(node_ref.clone(), node_id)
};
// Handle the existence of safety route
match safety_route_spec {
None => {
// If no safety route is being used, and we're not sending to a private
@@ -445,7 +454,7 @@ impl RPCProcessor {
out = reader_to_vec(message)?;
// Message goes directly to the node
out_node_id = node_ref.node_id();
out_node_id = node_id;
out_noderef = Some(node_ref);
hopcount = 1;
}
@@ -454,7 +463,7 @@ impl RPCProcessor {
// but we are using a safety route, so we must create an empty private route
let mut pr_builder = ::capnp::message::Builder::new_default();
let private_route =
self.new_stub_private_route(node_ref.node_id(), &mut pr_builder)?;
self.new_stub_private_route(node_id, &mut pr_builder)?;
let message_vec = reader_to_vec(message)?;
// first
@@ -487,7 +496,7 @@ impl RPCProcessor {
None => {
// If no safety route, the first node is the first hop of the private route
hopcount = private_route.hop_count as usize;
let out_node_id = match private_route.hops {
let out_node_id = match &private_route.hops {
Some(rh) => rh.dial_info.node_id.key,
_ => return Err(rpc_error_internal("private route has no hops")),
};
@@ -547,7 +556,7 @@ impl RPCProcessor {
let bytes = out.len() as u64;
if let Err(e) = self
.network_manager()
.send_envelope(node_ref.clone(), out)
.send_envelope(node_ref.clone(), Some(out_node_id), out)
.await
.map_err(logthru_rpc!(error))
.map_err(RPCError::Internal)
@@ -735,7 +744,7 @@ impl RPCProcessor {
// Send the reply
let bytes = out.len() as u64;
self.network_manager()
.send_envelope(node_ref.clone(), out)
.send_envelope(node_ref.clone(), Some(out_node_id), out)
.await
.map_err(RPCError::Internal)?;
@@ -762,17 +771,17 @@ impl RPCProcessor {
}
}
fn get_respond_to_sender_dial_info(
fn get_respond_to_sender_node_info(
&self,
operation: &veilid_capnp::operation::Reader,
) -> Result<Option<DialInfo>, RPCError> {
if let veilid_capnp::operation::respond_to::Sender(Ok(sender_di_reader)) = operation
) -> Result<Option<NodeInfo>, RPCError> {
if let veilid_capnp::operation::respond_to::Sender(Ok(sender_ni_reader)) = operation
.get_respond_to()
.which()
.map_err(map_error_capnp_notinschema!())?
{
// Sender DialInfo was specified, update our routing table with it
Ok(Some(decode_dial_info(&sender_di_reader)?))
Ok(Some(decode_node_info(&sender_ni_reader, true)?))
} else {
Ok(None)
}
@@ -809,17 +818,17 @@ impl RPCProcessor {
};
// Parse out fields
let node_info = decode_node_info(
let node_status = decode_node_status(
&iq_reader
.get_node_info()
.map_err(map_error_internal!("no valid node info"))?,
.get_node_status()
.map_err(map_error_internal!("no valid node status"))?,
)?;
// add node information for the requesting node to our routing table
// update node status for the requesting node to our routing table
if let Some(sender_nr) = rpcreader.opt_sender_nr.clone() {
// Update latest node info in routing table for the infoq sender
// Update latest node status in routing table for the infoq sender
sender_nr.operate(|e| {
e.update_node_info(node_info);
e.update_node_status(node_status);
});
}
@@ -831,10 +840,12 @@ impl RPCProcessor {
respond_to.set_none(());
let detail = answer.reborrow().init_detail();
let mut info_a = detail.init_info_a();
// Add node info
let node_info = self.network_manager().generate_node_info();
let mut nib = info_a.reborrow().init_node_info();
encode_node_info(&node_info, &mut nib)?;
// Add node status
let node_status = self.network_manager().generate_node_status();
let mut nsb = info_a.reborrow().init_node_status();
encode_node_status(&node_status, &mut nsb)?;
// Add sender info
let sender_info = self.generate_sender_info(&rpcreader);
let mut sib = info_a.reborrow().init_sender_info();
@@ -898,7 +909,7 @@ impl RPCProcessor {
for peer in peers {
// See if this peer will validate dial info
let will_validate_dial_info = peer.operate(|e: &mut BucketEntry| {
if let Some(ni) = &e.peer_stats().node_info {
if let Some(ni) = &e.peer_stats().status {
ni.will_validate_dial_info
} else {
true
@@ -980,23 +991,14 @@ impl RPCProcessor {
.map_err(logthru_rpc!())?,
);
// get the peerinfo/dialinfos of the requesting node
let dil_reader = fnq_reader
// get the sender NodeInfo of the requesting node
let sni_reader = fnq_reader
.reborrow()
.get_dial_info_list()
.get_sender_node_info()
.map_err(map_error_capnp_error!())?;
let mut dial_infos = Vec::<DialInfo>::with_capacity(
dil_reader
.len()
.try_into()
.map_err(map_error_protocol!("too many dial infos"))?,
);
for di in dil_reader.iter() {
dial_infos.push(decode_dial_info(&di)?)
}
let peer_info = PeerInfo {
node_id: NodeId::new(rpcreader.header.envelope.get_sender_id()),
dial_infos,
node_info: decode_node_info(&sni_reader, true)?,
};
// filter out attempts to pass non-public addresses in for peers
@@ -1007,19 +1009,17 @@ impl RPCProcessor {
// add node information for the requesting node to our routing table
let routing_table = self.routing_table();
let _requesting_node_ref = routing_table
.register_node_with_dial_info(peer_info.node_id.key, &peer_info.dial_infos)
.register_node_with_node_info(peer_info.node_id.key, peer_info.node_info)
.map_err(map_error_string!())?;
// find N nodes closest to the target node in our routing table
let own_peer_info = routing_table.get_own_peer_info(self.default_peer_scope);
let own_peer_info = routing_table.get_own_peer_info();
let closest_nodes = routing_table.find_closest_nodes(
target_node_id,
// filter
None,
// transform
|e| {
RoutingTable::transform_to_peer_info(e, self.default_peer_scope, &own_peer_info)
},
|e| RoutingTable::transform_to_peer_info(e, &own_peer_info),
);
log_rpc!(">>>> Returning {} closest peers", closest_nodes.len());
@@ -1076,8 +1076,8 @@ impl RPCProcessor {
Err(rpc_error_unimplemented("process_find_block_q"))
}
async fn process_signal_q(&self, _rpcreader: RPCMessageReader) -> Result<(), RPCError> {
Err(rpc_error_unimplemented("process_signal_q"))
async fn process_signal(&self, _rpcreader: RPCMessageReader) -> Result<(), RPCError> {
Err(rpc_error_unimplemented("process_signal"))
}
async fn process_return_receipt(&self, rpcreader: RPCMessageReader) -> Result<(), RPCError> {
@@ -1176,28 +1176,27 @@ impl RPCProcessor {
veilid_capnp::operation::detail::SupplyBlockA(_) => (14u32, false),
veilid_capnp::operation::detail::FindBlockQ(_) => (15u32, true),
veilid_capnp::operation::detail::FindBlockA(_) => (16u32, false),
veilid_capnp::operation::detail::SignalQ(_) => (17u32, true),
veilid_capnp::operation::detail::SignalA(_) => (18u32, false),
veilid_capnp::operation::detail::ReturnReceipt(_) => (19u32, true),
veilid_capnp::operation::detail::StartTunnelQ(_) => (20u32, true),
veilid_capnp::operation::detail::StartTunnelA(_) => (21u32, false),
veilid_capnp::operation::detail::CompleteTunnelQ(_) => (22u32, true),
veilid_capnp::operation::detail::CompleteTunnelA(_) => (23u32, false),
veilid_capnp::operation::detail::CancelTunnelQ(_) => (24u32, true),
veilid_capnp::operation::detail::CancelTunnelA(_) => (25u32, false),
veilid_capnp::operation::detail::Signal(_) => (17u32, true),
veilid_capnp::operation::detail::ReturnReceipt(_) => (18u32, true),
veilid_capnp::operation::detail::StartTunnelQ(_) => (19u32, true),
veilid_capnp::operation::detail::StartTunnelA(_) => (20u32, false),
veilid_capnp::operation::detail::CompleteTunnelQ(_) => (21u32, true),
veilid_capnp::operation::detail::CompleteTunnelA(_) => (22u32, false),
veilid_capnp::operation::detail::CancelTunnelQ(_) => (23u32, true),
veilid_capnp::operation::detail::CancelTunnelA(_) => (24u32, false),
};
// Accounting for questions we receive
if is_q {
// See if we have some Sender DialInfo to incorporate
// See if we have some Sender NodeInfo to incorporate
opt_sender_nr =
if let Some(sender_di) = self.get_respond_to_sender_dial_info(&operation)? {
// Sender DialInfo was specified, update our routing table with it
if let Some(sender_ni) = self.get_respond_to_sender_node_info(&operation)? {
// Sender NodeInfo was specified, update our routing table with it
let nr = self
.routing_table()
.update_node_with_single_dial_info(
.register_node_with_node_info(
msg.header.envelope.get_sender_id(),
&sender_di,
sender_ni,
)
.map_err(RPCError::Internal)?;
Some(nr)
@@ -1251,15 +1250,14 @@ impl RPCProcessor {
14 => self.process_answer(rpcreader).await, // SupplyBlockA
15 => self.process_find_block_q(rpcreader).await, // FindBlockQ
16 => self.process_answer(rpcreader).await, // FindBlockA
17 => self.process_signal_q(rpcreader).await, // SignalQ
18 => self.process_answer(rpcreader).await, // SignalA
19 => self.process_return_receipt(rpcreader).await, // ReturnReceipt
20 => self.process_start_tunnel_q(rpcreader).await, // StartTunnelQ
21 => self.process_answer(rpcreader).await, // StartTunnelA
22 => self.process_complete_tunnel_q(rpcreader).await, // CompleteTunnelQ
23 => self.process_answer(rpcreader).await, // CompleteTunnelA
24 => self.process_cancel_tunnel_q(rpcreader).await, // CancelTunnelQ
25 => self.process_answer(rpcreader).await, // CancelTunnelA
17 => self.process_signal(rpcreader).await, // SignalQ
18 => self.process_return_receipt(rpcreader).await, // ReturnReceipt
19 => self.process_start_tunnel_q(rpcreader).await, // StartTunnelQ
20 => self.process_answer(rpcreader).await, // StartTunnelA
21 => self.process_complete_tunnel_q(rpcreader).await, // CompleteTunnelQ
22 => self.process_answer(rpcreader).await, // CompleteTunnelA
23 => self.process_cancel_tunnel_q(rpcreader).await, // CancelTunnelQ
24 => self.process_answer(rpcreader).await, // CancelTunnelA
_ => panic!("must update rpc table"),
}
}
@@ -1361,19 +1359,15 @@ impl RPCProcessor {
// Gets a 'RespondTo::Sender' that contains either our dial info,
// or None if the peer has seen our dial info before
pub fn get_respond_to_sender(&self, peer: NodeRef) -> RespondTo {
if peer.has_seen_our_dial_info() {
if peer.has_seen_our_node_info() {
RespondTo::Sender(None)
} else if let Some(did) = self
.routing_table()
.first_filtered_dial_info_detail(peer.dial_info_filter())
{
RespondTo::Sender(Some(did.dial_info))
} else {
RespondTo::Sender(None)
RespondTo::Sender(Some(self.routing_table().get_own_peer_info().node_info))
}
}
// Send InfoQ RPC request, receive InfoA answer
// Can be sent via relays, but not via routes
pub async fn rpc_call_info(self, peer: NodeRef) -> Result<InfoAnswer, RPCError> {
let info_q_msg = {
let mut info_q_msg = ::capnp::message::Builder::new_default();
@@ -1384,9 +1378,9 @@ impl RPCProcessor {
.encode(&mut respond_to)?;
let detail = question.reborrow().init_detail();
let mut iqb = detail.init_info_q();
let mut node_info_builder = iqb.reborrow().init_node_info();
let node_info = self.network_manager().generate_node_info();
encode_node_info(&node_info, &mut node_info_builder)?;
let mut node_status_builder = iqb.reborrow().init_node_status();
let node_status = self.network_manager().generate_node_status();
encode_node_status(&node_status, &mut node_status_builder)?;
info_q_msg.into_reader()
};
@@ -1418,13 +1412,13 @@ impl RPCProcessor {
};
// Decode node info
if !info_a.has_node_info() {
return Err(rpc_error_internal("Missing node info"));
if !info_a.has_node_status() {
return Err(rpc_error_internal("Missing node status"));
}
let nir = info_a
.get_node_info()
.map_err(map_error_internal!("Broken node info"))?;
let node_info = decode_node_info(&nir)?;
let nsr = info_a
.get_node_status()
.map_err(map_error_internal!("Broken node status"))?;
let node_status = decode_node_status(&nsr)?;
// Decode sender info
let sender_info = if info_a.has_sender_info() {
@@ -1436,21 +1430,22 @@ impl RPCProcessor {
SenderInfo::default()
};
// Update latest node info in routing table
// Update latest node status in routing table
peer.operate(|e| {
e.update_node_info(node_info.clone());
e.update_node_status(node_status.clone());
});
// Return the answer for anyone who may care
let out = InfoAnswer {
latency,
node_info,
node_status,
sender_info,
};
Ok(out)
}
// Can only be sent directly, not via relays or routes
pub async fn rpc_call_validate_dial_info(
&self,
peer: NodeRef,
@@ -1496,7 +1491,8 @@ impl RPCProcessor {
};
// Send the validate_dial_info request
self.request(Destination::Direct(peer.clone()), vdi_msg, None)
// This can only be sent directly, as relays can not validate dial info
self.request(Destination::Direct(peer), vdi_msg, None)
.await?;
// Wait for receipt
@@ -1510,6 +1506,7 @@ impl RPCProcessor {
}
// Send FindNodeQ RPC request, receive FindNodeA answer
// Can be sent via all methods including relays and routes
pub async fn rpc_call_find_node(
self,
dest: Destination,
@@ -1528,22 +1525,10 @@ impl RPCProcessor {
let mut node_id_builder = fnq.reborrow().init_node_id();
encode_public_key(&key, &mut node_id_builder)?;
let own_peer_info = self
.routing_table()
.get_own_peer_info(self.default_peer_scope);
let own_peer_info = self.routing_table().get_own_peer_info();
let mut dil_builder = fnq.reborrow().init_dial_info_list(
own_peer_info
.dial_infos
.len()
.try_into()
.map_err(map_error_internal!("too many dial infos in peer info"))?,
);
for idx in 0..own_peer_info.dial_infos.len() {
let mut di_builder = dil_builder.reborrow().get(idx as u32);
encode_dial_info(&own_peer_info.dial_infos[idx], &mut di_builder)?;
}
let mut ni_builder = fnq.reborrow().init_sender_node_info();
encode_node_info(&own_peer_info.node_info, &mut ni_builder)?;
find_node_q_msg.into_reader()
};
@@ -1584,7 +1569,7 @@ impl RPCProcessor {
.map_err(map_error_internal!("too many peers"))?,
);
for p in peers_reader.iter() {
let peer_info = decode_peer_info(&p)?;
let peer_info = decode_peer_info(&p, true)?;
if !self.filter_peer_scope(&peer_info) {
return Err(RPCError::InvalidFormat);
@@ -1598,5 +1583,35 @@ impl RPCProcessor {
Ok(out)
}
// Sends a unidirectional signal to a node
// Can be sent via all methods including relays and routes
pub async fn rpc_call_signal(
&self,
dest: Destination,
relay_dial_info: DialInfo,
safety_route: Option<&SafetyRouteSpec>,
signal_info: SignalInfo,
) -> Result<(), RPCError> {
let network_manager = self.network_manager();
//
let sig_msg = {
let mut sig_msg = ::capnp::message::Builder::new_default();
let mut question = sig_msg.init_root::<veilid_capnp::operation::Builder>();
question.set_op_id(self.get_next_op_id());
let mut respond_to = question.reborrow().init_respond_to();
respond_to.set_none(());
let detail = question.reborrow().init_detail();
let mut sig_builder = detail.init_signal();
encode_signal_info(&signal_info, &mut sig_builder)?;
sig_msg.into_reader()
};
// Send the signal request
self.request(dest, sig_msg, safety_route).await?;
Ok(())
}
// xxx do not process latency for routed messages
}