refactor for routing domains
This commit is contained in:
@@ -390,8 +390,7 @@ impl RPCProcessor {
|
||||
Err(_) | Ok(TimeoutOr::Timeout) => {
|
||||
self.cancel_op_id_waiter(waitable_reply.op_id);
|
||||
|
||||
self.routing_table()
|
||||
.stats_question_lost(waitable_reply.node_ref.clone());
|
||||
waitable_reply.node_ref.stats_question_lost();
|
||||
}
|
||||
Ok(TimeoutOr::Value((rpcreader, _))) => {
|
||||
// Note that the remote node definitely received this node info since we got a reply
|
||||
@@ -399,8 +398,7 @@ impl RPCProcessor {
|
||||
|
||||
// Reply received
|
||||
let recv_ts = intf::get_timestamp();
|
||||
self.routing_table().stats_answer_rcvd(
|
||||
waitable_reply.node_ref,
|
||||
waitable_reply.node_ref.stats_answer_rcvd(
|
||||
waitable_reply.send_ts,
|
||||
recv_ts,
|
||||
rpcreader.header.body_len,
|
||||
@@ -595,20 +593,19 @@ impl RPCProcessor {
|
||||
.map_err(|e| {
|
||||
// If we're returning an error, clean up
|
||||
self.cancel_op_id_waiter(op_id);
|
||||
self.routing_table()
|
||||
.stats_failed_to_send(node_ref.clone(), send_ts, true);
|
||||
RPCError::network(e) })?
|
||||
=> {
|
||||
node_ref
|
||||
.stats_failed_to_send(send_ts, true);
|
||||
RPCError::network(e)
|
||||
})? => {
|
||||
// If we couldn't send we're still cleaning up
|
||||
self.cancel_op_id_waiter(op_id);
|
||||
self.routing_table()
|
||||
.stats_failed_to_send(node_ref, send_ts, true);
|
||||
node_ref
|
||||
.stats_failed_to_send(send_ts, true);
|
||||
}
|
||||
);
|
||||
|
||||
// Successfully sent
|
||||
self.routing_table()
|
||||
.stats_question_sent(node_ref.clone(), send_ts, bytes, true);
|
||||
node_ref.stats_question_sent(send_ts, bytes, true);
|
||||
|
||||
// Pass back waitable reply completion
|
||||
Ok(NetworkResult::value(WaitableReply {
|
||||
@@ -663,18 +660,18 @@ impl RPCProcessor {
|
||||
.await
|
||||
.map_err(|e| {
|
||||
// If we're returning an error, clean up
|
||||
self.routing_table()
|
||||
.stats_failed_to_send(node_ref.clone(), send_ts, true);
|
||||
RPCError::network(e) })? => {
|
||||
node_ref
|
||||
.stats_failed_to_send(send_ts, true);
|
||||
RPCError::network(e)
|
||||
})? => {
|
||||
// If we couldn't send we're still cleaning up
|
||||
self.routing_table()
|
||||
.stats_failed_to_send(node_ref, send_ts, true);
|
||||
node_ref
|
||||
.stats_failed_to_send(send_ts, true);
|
||||
}
|
||||
);
|
||||
|
||||
// Successfully sent
|
||||
self.routing_table()
|
||||
.stats_question_sent(node_ref.clone(), send_ts, bytes, true);
|
||||
node_ref.stats_question_sent(send_ts, bytes, true);
|
||||
|
||||
Ok(NetworkResult::value(()))
|
||||
}
|
||||
@@ -755,17 +752,18 @@ impl RPCProcessor {
|
||||
.await
|
||||
.map_err(|e| {
|
||||
// If we're returning an error, clean up
|
||||
self.routing_table()
|
||||
.stats_failed_to_send(node_ref.clone(), send_ts, true);
|
||||
RPCError::network(e) })? => {
|
||||
node_ref
|
||||
.stats_failed_to_send(send_ts, true);
|
||||
RPCError::network(e)
|
||||
})? => {
|
||||
// If we couldn't send we're still cleaning up
|
||||
self.routing_table()
|
||||
.stats_failed_to_send(node_ref.clone(), send_ts, false);
|
||||
node_ref
|
||||
.stats_failed_to_send(send_ts, false);
|
||||
}
|
||||
);
|
||||
|
||||
// Reply successfully sent
|
||||
self.routing_table().stats_answer_sent(node_ref, bytes);
|
||||
node_ref.stats_answer_sent(bytes);
|
||||
|
||||
Ok(NetworkResult::value(()))
|
||||
}
|
||||
@@ -828,21 +826,13 @@ impl RPCProcessor {
|
||||
let kind = match msg.operation.kind() {
|
||||
RPCOperationKind::Question(_) => {
|
||||
if let Some(sender_nr) = msg.opt_sender_nr.clone() {
|
||||
self.routing_table().stats_question_rcvd(
|
||||
sender_nr,
|
||||
msg.header.timestamp,
|
||||
msg.header.body_len,
|
||||
);
|
||||
sender_nr.stats_question_rcvd(msg.header.timestamp, msg.header.body_len);
|
||||
}
|
||||
"question"
|
||||
}
|
||||
RPCOperationKind::Statement(_) => {
|
||||
if let Some(sender_nr) = msg.opt_sender_nr.clone() {
|
||||
self.routing_table().stats_question_rcvd(
|
||||
sender_nr,
|
||||
msg.header.timestamp,
|
||||
msg.header.body_len,
|
||||
);
|
||||
sender_nr.stats_question_rcvd(msg.header.timestamp, msg.header.body_len);
|
||||
}
|
||||
"statement"
|
||||
}
|
||||
|
@@ -61,19 +61,27 @@ impl RPCProcessor {
|
||||
|
||||
// add node information for the requesting node to our routing table
|
||||
let routing_table = self.routing_table();
|
||||
let rt2 = routing_table.clone();
|
||||
let rt3 = routing_table.clone();
|
||||
|
||||
// find N nodes closest to the target node in our routing table
|
||||
let own_peer_info = routing_table.get_own_peer_info();
|
||||
let own_peer_info = routing_table.get_own_peer_info(RoutingDomain::PublicInternet);
|
||||
let own_peer_info_is_valid = own_peer_info.signed_node_info.is_valid();
|
||||
|
||||
let closest_nodes = routing_table.find_closest_nodes(
|
||||
find_node_q.node_id,
|
||||
// filter
|
||||
Some(move |_k, v| {
|
||||
RoutingTable::filter_has_valid_signed_node_info(v, own_peer_info_is_valid)
|
||||
rt2.filter_has_valid_signed_node_info(
|
||||
v,
|
||||
own_peer_info_is_valid,
|
||||
Some(RoutingDomain::PublicInternet),
|
||||
)
|
||||
}),
|
||||
// transform
|
||||
move |k, v| RoutingTable::transform_to_peer_info(k, v, &own_peer_info),
|
||||
move |k, v| {
|
||||
rt3.transform_to_peer_info(RoutingDomain::PublicInternet, k, v, &own_peer_info)
|
||||
},
|
||||
);
|
||||
|
||||
// Make status answer
|
||||
|
@@ -10,6 +10,7 @@ impl RPCProcessor {
|
||||
safety_route: Option<&SafetyRouteSpec>,
|
||||
) -> Result<NetworkResult<()>, RPCError> {
|
||||
let signed_node_info = self.routing_table().get_own_signed_node_info();
|
||||
xxx add routing domain to capnp....
|
||||
let node_info_update = RPCOperationNodeInfoUpdate { signed_node_info };
|
||||
let statement = RPCStatement::new(RPCStatementDetail::NodeInfoUpdate(node_info_update));
|
||||
|
||||
|
@@ -38,9 +38,7 @@ impl RPCProcessor {
|
||||
};
|
||||
|
||||
// Update latest node status in routing table
|
||||
peer.operate_mut(|e| {
|
||||
e.update_node_status(status_a.node_status.clone());
|
||||
});
|
||||
peer.update_node_status(status_a.node_status.clone());
|
||||
|
||||
// Report sender_info IP addresses to network manager
|
||||
if let Some(socket_address) = status_a.sender_info.socket_address {
|
||||
@@ -90,9 +88,7 @@ impl RPCProcessor {
|
||||
// update node status for the requesting node to our routing table
|
||||
if let Some(sender_nr) = msg.opt_sender_nr.clone() {
|
||||
// Update latest node status in routing table for the statusq sender
|
||||
sender_nr.operate_mut(|e| {
|
||||
e.update_node_status(status_q.node_status.clone());
|
||||
});
|
||||
sender_nr.update_node_status(status_q.node_status.clone());
|
||||
}
|
||||
|
||||
// Make status answer
|
||||
|
Reference in New Issue
Block a user