This commit is contained in:
John Smith
2021-11-27 12:44:21 -05:00
parent d1f728954c
commit 028e02f942
31 changed files with 190 additions and 207 deletions

View File

@@ -24,6 +24,6 @@ pub fn decode_node_dial_info_single(
Ok(NodeDialInfoSingle {
node_id: NodeId::new(node_id),
dial_info: dial_info,
dial_info,
})
}

View File

@@ -45,6 +45,6 @@ pub fn decode_peer_info(reader: &veilid_capnp::peer_info::Reader) -> Result<Peer
}
Ok(PeerInfo {
node_id: NodeId::new(decode_public_key(&nid_reader)),
dial_infos: dial_infos,
dial_infos,
})
}

View File

@@ -67,7 +67,7 @@ pub fn encode_route_hop(
)?;
if let Some(rhd) = &route_hop.next_hop {
let mut rhd_builder = builder.reborrow().init_next_hop();
encode_route_hop_data(&rhd, &mut rhd_builder)?;
encode_route_hop_data(rhd, &mut rhd_builder)?;
}
Ok(())
}
@@ -83,7 +83,7 @@ pub fn encode_private_route(
builder.set_hop_count(private_route.hop_count);
if let Some(rh) = &private_route.hops {
let mut rh_builder = builder.reborrow().init_first_hop();
encode_route_hop(&rh, &mut rh_builder)?;
encode_route_hop(rh, &mut rh_builder)?;
};
Ok(())
@@ -102,11 +102,11 @@ pub fn encode_safety_route(
match &safety_route.hops {
SafetyRouteHops::Data(rhd) => {
let mut rhd_builder = h_builder.init_data();
encode_route_hop_data(&rhd, &mut rhd_builder)?;
encode_route_hop_data(rhd, &mut rhd_builder)?;
}
SafetyRouteHops::Private(pr) => {
let mut pr_builder = h_builder.init_private();
encode_private_route(&pr, &mut pr_builder)?;
encode_private_route(pr, &mut pr_builder)?;
}
};
@@ -129,10 +129,7 @@ pub fn decode_route_hop_data(
.map_err(map_error_internal!("invalid blob in route hop data"))?
.to_vec();
Ok(RouteHopData {
nonce: nonce,
blob: blob,
})
Ok(RouteHopData { nonce, blob })
}
pub fn decode_route_hop(reader: &veilid_capnp::route_hop::Reader) -> Result<RouteHop, RPCError> {
@@ -153,8 +150,8 @@ pub fn decode_route_hop(reader: &veilid_capnp::route_hop::Reader) -> Result<Rout
};
Ok(RouteHop {
dial_info: dial_info,
next_hop: next_hop,
dial_info,
next_hop,
})
}
@@ -177,9 +174,9 @@ pub fn decode_private_route(
};
Ok(PrivateRoute {
public_key: public_key,
hop_count: hop_count,
hops: hops,
public_key,
hop_count,
hops,
})
}
@@ -205,8 +202,8 @@ pub fn decode_safety_route(
};
Ok(SafetyRoute {
public_key: public_key,
hop_count: hop_count,
hops: hops,
public_key,
hop_count,
hops,
})
}

View File

@@ -28,7 +28,5 @@ pub fn decode_sender_info(
} else {
None
};
Ok(SenderInfo {
socket_address: socket_address,
})
Ok(SenderInfo { socket_address })
}

View File

@@ -116,7 +116,7 @@ impl RespondTo {
}
Self::PrivateRoute(pr) => {
let mut pr_builder = builder.reborrow().init_private_route();
encode_private_route(&pr, &mut pr_builder)?;
encode_private_route(pr, &mut pr_builder)?;
}
};
Ok(())
@@ -137,7 +137,7 @@ struct RPCMessageData {
}
impl ReaderSegments for RPCMessageData {
fn get_segment<'a>(&'a self, idx: u32) -> Option<&'a [u8]> {
fn get_segment(&self, idx: u32) -> Option<&[u8]> {
if idx > 0 {
None
} else {
@@ -539,12 +539,13 @@ impl RPCProcessor {
) -> Result<(), RPCError> {
let eventual = {
let mut inner = self.inner.lock();
inner.waiting_rpc_table.remove(&op_id)
inner
.waiting_rpc_table
.remove(&op_id)
.ok_or_else(|| rpc_error_internal("Unmatched operation id"))?
};
match eventual {
None => Err(rpc_error_internal("Unmatched operation id")),
Some(e) => Ok(e.resolve(rpcreader).await),
}
eventual.resolve(rpcreader).await;
Ok(())
}
// wait for reply
@@ -661,11 +662,11 @@ impl RPCProcessor {
out_node_id = sr
.hops
.first()
.ok_or(rpc_error_internal("no hop in safety route"))?
.ok_or_else(|| rpc_error_internal("no hop in safety route"))?
.dial_info
.node_id
.key;
out = self.wrap_with_route(Some(&sr), private_route, message_vec)?;
out = self.wrap_with_route(Some(sr), private_route, message_vec)?;
hopcount = 1 + sr.hops.len();
}
};
@@ -700,11 +701,11 @@ impl RPCProcessor {
let out_node_id = sr
.hops
.first()
.ok_or(rpc_error_internal("no hop in safety route"))?
.ok_or_else(|| rpc_error_internal("no hop in safety route"))?
.dial_info
.node_id
.key;
out = self.wrap_with_route(Some(&sr), pr_reader, message_vec)?;
out = self.wrap_with_route(Some(sr), pr_reader, message_vec)?;
out_node_id
}
}
@@ -746,7 +747,7 @@ impl RPCProcessor {
.network_manager()
.send_envelope(node_ref.clone(), out)
.await
.map_err(|e| RPCError::Internal(e))
.map_err(RPCError::Internal)
{
// Make sure to clean up op id waiter in case of error
if eventual.is_some() {
@@ -851,7 +852,7 @@ impl RPCProcessor {
out_node_id = sr
.hops
.first()
.ok_or(rpc_error_internal("no hop in safety route"))?
.ok_or_else(|| rpc_error_internal("no hop in safety route"))?
.dial_info
.node_id
.key;
@@ -869,8 +870,7 @@ impl RPCProcessor {
};
// Reply with 'route' operation
out =
self.wrap_with_route(safety_route_spec.clone(), private_route, reply_vec)?;
out = self.wrap_with_route(safety_route_spec, private_route, reply_vec)?;
out_node_id = match safety_route_spec {
None => {
// If no safety route, the first node is the first hop of the private route
@@ -891,7 +891,7 @@ impl RPCProcessor {
// If safety route is in use, first node is the first hop of the safety route
sr.hops
.first()
.ok_or(rpc_error_internal("no hop in safety route"))?
.ok_or_else(|| rpc_error_internal("no hop in safety route"))?
.dial_info
.node_id
.key
@@ -919,7 +919,7 @@ impl RPCProcessor {
self.network_manager()
.send_envelope(node_ref.clone(), out)
.await
.map_err(|e| RPCError::Internal(e))?;
.map_err(RPCError::Internal)?;
// Reply successfully sent
let send_ts = get_timestamp();
@@ -971,7 +971,7 @@ impl RPCProcessor {
}
// xxx: bandwidth limiting here, don't commit to doing info redirects if our network quality sucks
return true;
true
}
//////////////////////////////////////////////////////////////////////
@@ -1094,7 +1094,7 @@ impl RPCProcessor {
let routing_table = self.routing_table();
let protocol_address_type = dial_info.protocol_address_type();
let peers = routing_table.get_fast_nodes_of_type(protocol_address_type);
if peers.len() == 0 {
if peers.is_empty() {
return Err(rpc_error_internal(format!(
"no peers of type '{:?}'",
protocol_address_type
@@ -1222,9 +1222,9 @@ impl RPCProcessor {
.try_into()
.map_err(map_error_internal!("invalid closest nodes list length"))?,
);
for i in 0..closest_nodes.len() {
for (i, closest_node) in closest_nodes.iter().enumerate() {
let mut pi_builder = peers_builder.reborrow().get(i as u32);
encode_peer_info(&closest_nodes[i], &mut pi_builder)?;
encode_peer_info(closest_node, &mut pi_builder)?;
}
reply_msg.into_reader()
};
@@ -1440,19 +1440,9 @@ impl RPCProcessor {
}
async fn rpc_worker(self, receiver: Receiver<RPCMessage>) {
loop {
let msg = match receiver.recv().await {
Ok(v) => v,
Err(_) => {
break;
}
};
match self.process_rpc_message(msg).await {
Ok(_) => (),
Err(e) => {
error!("Couldn't process rpc message: {}", e);
}
while let Ok(msg) = receiver.recv().await {
if let Err(e) = self.process_rpc_message(msg).await {
error!("Couldn't process rpc message: {}", e);
}
}
}
@@ -1651,9 +1641,9 @@ impl RPCProcessor {
// Wait for receipt
match eventual_value.await {
ReceiptEvent::RETURNED => Ok(true),
ReceiptEvent::EXPIRED => Ok(false),
ReceiptEvent::CANCELLED => Err(rpc_error_internal(
ReceiptEvent::Returned => Ok(true),
ReceiptEvent::Expired => Ok(false),
ReceiptEvent::Cancelled => Err(rpc_error_internal(
"receipt was dropped before expiration".to_owned(),
)),
}
@@ -1685,7 +1675,7 @@ impl RPCProcessor {
} else {
PeerScope::All
});
if own_peer_info.dial_infos.len() == 0 {
if own_peer_info.dial_infos.is_empty() {
return Err(rpc_error_internal("No valid public dial info for own node"));
}