route spec store work
This commit is contained in:
parent
4a9d516f32
commit
5935ca9e41
@ -4,6 +4,11 @@ use rkyv::{
|
|||||||
with::Skip, Archive as RkyvArchive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize,
|
with::Skip, Archive as RkyvArchive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/// The size of the remote private route cache
|
||||||
|
const REMOTE_PRIVATE_ROUTE_CACHE_SIZE: usize = 1024;
|
||||||
|
/// Remote private route cache entries expire in 5 minutes if they haven't been used
|
||||||
|
const REMOTE_PRIVATE_ROUTE_CACHE_EXPIRY: u64 = 300_000_000u64;
|
||||||
|
|
||||||
/// Compiled route (safety route + private route)
|
/// Compiled route (safety route + private route)
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct CompiledRoute {
|
pub struct CompiledRoute {
|
||||||
@ -73,8 +78,21 @@ pub struct RouteSpecStoreContent {
|
|||||||
details: HashMap<DHTKey, RouteSpecDetail>,
|
details: HashMap<DHTKey, RouteSpecDetail>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// What remote private routes have seen
|
||||||
|
#[derive(Debug, Clone, Default)]
|
||||||
|
struct RemotePrivateRouteInfo {
|
||||||
|
// When this remote private route was last modified
|
||||||
|
modified_ts: u64,
|
||||||
|
/// Did this remote private route see our node info due to no safety route in use
|
||||||
|
seen_our_node_info: bool,
|
||||||
|
/// The time this remote private route last responded
|
||||||
|
last_replied_ts: Option<u64>,
|
||||||
|
/// Timestamp of when the route was last used for anything
|
||||||
|
last_used_ts: Option<u64>,
|
||||||
|
}
|
||||||
|
|
||||||
/// Ephemeral data used to help the RouteSpecStore operate efficiently
|
/// Ephemeral data used to help the RouteSpecStore operate efficiently
|
||||||
#[derive(Debug, Default)]
|
#[derive(Debug)]
|
||||||
pub struct RouteSpecStoreCache {
|
pub struct RouteSpecStoreCache {
|
||||||
/// How many times nodes have been used
|
/// How many times nodes have been used
|
||||||
used_nodes: HashMap<DHTKey, usize>,
|
used_nodes: HashMap<DHTKey, usize>,
|
||||||
@ -82,6 +100,19 @@ pub struct RouteSpecStoreCache {
|
|||||||
used_end_nodes: HashMap<DHTKey, usize>,
|
used_end_nodes: HashMap<DHTKey, usize>,
|
||||||
/// Route spec hop cache, used to quickly disqualify routes
|
/// Route spec hop cache, used to quickly disqualify routes
|
||||||
hop_cache: HashSet<Vec<u8>>,
|
hop_cache: HashSet<Vec<u8>>,
|
||||||
|
/// Has a remote private route responded to a question and when
|
||||||
|
remote_private_route_cache: LruCache<DHTKey, RemotePrivateRouteInfo>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for RouteSpecStoreCache {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
used_nodes: Default::default(),
|
||||||
|
used_end_nodes: Default::default(),
|
||||||
|
hop_cache: Default::default(),
|
||||||
|
remote_private_route_cache: LruCache::new(REMOTE_PRIVATE_ROUTE_CACHE_SIZE),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
@ -388,7 +419,7 @@ impl RouteSpecStore {
|
|||||||
sequencing: Sequencing,
|
sequencing: Sequencing,
|
||||||
hop_count: usize,
|
hop_count: usize,
|
||||||
directions: DirectionSet,
|
directions: DirectionSet,
|
||||||
avoid_node_id: Option<DHTKey>,
|
avoid_node_ids: &[DHTKey],
|
||||||
) -> EyreResult<Option<DHTKey>> {
|
) -> EyreResult<Option<DHTKey>> {
|
||||||
let inner = &mut *self.inner.lock();
|
let inner = &mut *self.inner.lock();
|
||||||
let routing_table = self.unlocked_inner.routing_table.clone();
|
let routing_table = self.unlocked_inner.routing_table.clone();
|
||||||
@ -401,7 +432,7 @@ impl RouteSpecStore {
|
|||||||
sequencing,
|
sequencing,
|
||||||
hop_count,
|
hop_count,
|
||||||
directions,
|
directions,
|
||||||
avoid_node_id,
|
avoid_node_ids,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -413,7 +444,7 @@ impl RouteSpecStore {
|
|||||||
sequencing: Sequencing,
|
sequencing: Sequencing,
|
||||||
hop_count: usize,
|
hop_count: usize,
|
||||||
directions: DirectionSet,
|
directions: DirectionSet,
|
||||||
avoid_node_id: Option<DHTKey>,
|
avoid_node_ids: &[DHTKey],
|
||||||
) -> EyreResult<Option<DHTKey>> {
|
) -> EyreResult<Option<DHTKey>> {
|
||||||
use core::cmp::Ordering;
|
use core::cmp::Ordering;
|
||||||
|
|
||||||
@ -443,12 +474,10 @@ impl RouteSpecStore {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Exclude node we have specifically chosen to avoid
|
// Exclude nodes we have specifically chosen to avoid
|
||||||
if let Some(ani) = avoid_node_id {
|
if avoid_node_ids.contains(&k) {
|
||||||
if k == ani {
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// Exclude nodes with no publicinternet nodeinfo, or incompatible nodeinfo or node status won't route
|
// Exclude nodes with no publicinternet nodeinfo, or incompatible nodeinfo or node status won't route
|
||||||
v.with(rti, move |_rti, e| {
|
v.with(rti, move |_rti, e| {
|
||||||
@ -758,17 +787,15 @@ impl RouteSpecStore {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Find first matching unpublished route that fits into the selection criteria
|
/// Find first matching unpublished route that fits into the selection criteria
|
||||||
pub fn first_unpublished_route(
|
fn first_unpublished_route_inner<'a>(
|
||||||
&self,
|
inner: &'a RouteSpecStoreInner,
|
||||||
min_hop_count: usize,
|
min_hop_count: usize,
|
||||||
max_hop_count: usize,
|
max_hop_count: usize,
|
||||||
stability: Stability,
|
stability: Stability,
|
||||||
sequencing: Sequencing,
|
sequencing: Sequencing,
|
||||||
directions: DirectionSet,
|
directions: DirectionSet,
|
||||||
avoid_node_id: Option<DHTKey>,
|
avoid_node_ids: &[DHTKey],
|
||||||
) -> Option<DHTKey> {
|
) -> Option<DHTKey> {
|
||||||
let inner = self.inner.lock();
|
|
||||||
|
|
||||||
for detail in &inner.content.details {
|
for detail in &inner.content.details {
|
||||||
if detail.1.stability >= stability
|
if detail.1.stability >= stability
|
||||||
&& detail.1.sequencing >= sequencing
|
&& detail.1.sequencing >= sequencing
|
||||||
@ -778,9 +805,10 @@ impl RouteSpecStore {
|
|||||||
&& !detail.1.published
|
&& !detail.1.published
|
||||||
{
|
{
|
||||||
let mut avoid = false;
|
let mut avoid = false;
|
||||||
if let Some(ani) = &avoid_node_id {
|
for h in &detail.1.hops {
|
||||||
if detail.1.hops.contains(ani) {
|
if avoid_node_ids.contains(h) {
|
||||||
avoid = true;
|
avoid = true;
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !avoid {
|
if !avoid {
|
||||||
@ -864,65 +892,16 @@ impl RouteSpecStore {
|
|||||||
SafetySelection::Safe(safety_spec) => safety_spec,
|
SafetySelection::Safe(safety_spec) => safety_spec,
|
||||||
};
|
};
|
||||||
|
|
||||||
// See if the preferred route is here
|
// Get the safety route to use from the spec
|
||||||
let opt_safety_rsd: Option<(&mut RouteSpecDetail, DHTKey)> =
|
|
||||||
if let Some(preferred_route) = safety_spec.preferred_route {
|
|
||||||
Self::detail_mut(inner, &preferred_route).map(|rsd| (rsd, preferred_route))
|
|
||||||
} else {
|
|
||||||
// Preferred safety route was not requested
|
|
||||||
None
|
|
||||||
};
|
|
||||||
let (safety_rsd, sr_pubkey) = if let Some(safety_rsd) = opt_safety_rsd {
|
|
||||||
// Safety route exists
|
|
||||||
safety_rsd
|
|
||||||
} else {
|
|
||||||
// Avoid having the first node in the private route in our chosen safety route
|
|
||||||
// We would avoid all of them, but by design only the first node is knowable
|
|
||||||
let avoid_node_id = match &pr_first_hop.node {
|
let avoid_node_id = match &pr_first_hop.node {
|
||||||
RouteNode::NodeId(n) => n.key,
|
RouteNode::NodeId(n) => n.key,
|
||||||
RouteNode::PeerInfo(p) => p.node_id.key,
|
RouteNode::PeerInfo(p) => p.node_id.key,
|
||||||
};
|
};
|
||||||
|
let Some(sr_pubkey) = self.get_route_for_safety_spec_inner(inner, rti, &safety_spec, Direction::Outbound.into(), &[avoid_node_id])? else {
|
||||||
// Select a safety route from the pool or make one if we don't have one that matches
|
// No safety route could be found for this spec
|
||||||
if let Some(sr_pubkey) = self.first_unpublished_route(
|
return Ok(None);
|
||||||
safety_spec.hop_count,
|
|
||||||
safety_spec.hop_count,
|
|
||||||
safety_spec.stability,
|
|
||||||
safety_spec.sequencing,
|
|
||||||
Direction::Outbound.into(),
|
|
||||||
Some(avoid_node_id),
|
|
||||||
) {
|
|
||||||
// Found a route to use
|
|
||||||
(Self::detail_mut(inner, &sr_pubkey).unwrap(), sr_pubkey)
|
|
||||||
} else {
|
|
||||||
// No route found, gotta allocate one
|
|
||||||
let sr_pubkey = match self
|
|
||||||
.allocate_route_inner(
|
|
||||||
inner,
|
|
||||||
rti,
|
|
||||||
safety_spec.stability,
|
|
||||||
safety_spec.sequencing,
|
|
||||||
safety_spec.hop_count,
|
|
||||||
Direction::Outbound.into(),
|
|
||||||
Some(avoid_node_id),
|
|
||||||
)
|
|
||||||
.map_err(RPCError::internal)?
|
|
||||||
{
|
|
||||||
Some(pk) => pk,
|
|
||||||
None => return Ok(None),
|
|
||||||
};
|
};
|
||||||
(Self::detail_mut(inner, &sr_pubkey).unwrap(), sr_pubkey)
|
let safety_rsd = Self::detail_mut(inner, &sr_pubkey).unwrap();
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// Ensure the total hop count isn't too long for our config
|
|
||||||
let sr_hopcount = safety_spec.hop_count;
|
|
||||||
if sr_hopcount == 0 {
|
|
||||||
bail!("safety route hop count is zero");
|
|
||||||
}
|
|
||||||
if sr_hopcount > max_route_hop_count {
|
|
||||||
bail!("safety route hop count too long");
|
|
||||||
}
|
|
||||||
|
|
||||||
// See if we can optimize this compilation yet
|
// See if we can optimize this compilation yet
|
||||||
// We don't want to include full nodeinfo if we don't have to
|
// We don't want to include full nodeinfo if we don't have to
|
||||||
@ -951,7 +930,7 @@ impl RouteSpecStore {
|
|||||||
// Each loop mutates 'nonce', and 'blob_data'
|
// Each loop mutates 'nonce', and 'blob_data'
|
||||||
let mut nonce = Crypto::get_random_nonce();
|
let mut nonce = Crypto::get_random_nonce();
|
||||||
let crypto = routing_table.network_manager().crypto();
|
let crypto = routing_table.network_manager().crypto();
|
||||||
for h in (1..sr_hopcount).rev() {
|
for h in (1..safety_rsd.hops.len()).rev() {
|
||||||
// Get blob to encrypt for next hop
|
// Get blob to encrypt for next hop
|
||||||
blob_data = {
|
blob_data = {
|
||||||
// Encrypt the previous blob ENC(nonce, DH(PKhop,SKsr))
|
// Encrypt the previous blob ENC(nonce, DH(PKhop,SKsr))
|
||||||
@ -1046,6 +1025,84 @@ impl RouteSpecStore {
|
|||||||
Ok(Some(compiled_route))
|
Ok(Some(compiled_route))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get a route that matches a particular safety spec
|
||||||
|
fn get_route_for_safety_spec_inner(
|
||||||
|
&self,
|
||||||
|
inner: &mut RouteSpecStoreInner,
|
||||||
|
rti: &RoutingTableInner,
|
||||||
|
safety_spec: &SafetySpec,
|
||||||
|
direction: DirectionSet,
|
||||||
|
avoid_node_ids: &[DHTKey],
|
||||||
|
) -> EyreResult<Option<DHTKey>> {
|
||||||
|
// Ensure the total hop count isn't too long for our config
|
||||||
|
let max_route_hop_count = self.unlocked_inner.max_route_hop_count;
|
||||||
|
if safety_spec.hop_count == 0 {
|
||||||
|
bail!("safety route hop count is zero");
|
||||||
|
}
|
||||||
|
if safety_spec.hop_count > max_route_hop_count {
|
||||||
|
bail!("safety route hop count too long");
|
||||||
|
}
|
||||||
|
|
||||||
|
// See if the preferred route is here
|
||||||
|
if let Some(preferred_route) = safety_spec.preferred_route {
|
||||||
|
if inner.content.details.contains_key(&preferred_route) {
|
||||||
|
return Ok(Some(preferred_route));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Select a safety route from the pool or make one if we don't have one that matches
|
||||||
|
let sr_pubkey = if let Some(sr_pubkey) = Self::first_unpublished_route_inner(
|
||||||
|
inner,
|
||||||
|
safety_spec.hop_count,
|
||||||
|
safety_spec.hop_count,
|
||||||
|
safety_spec.stability,
|
||||||
|
safety_spec.sequencing,
|
||||||
|
direction,
|
||||||
|
avoid_node_ids,
|
||||||
|
) {
|
||||||
|
// Found a route to use
|
||||||
|
sr_pubkey
|
||||||
|
} else {
|
||||||
|
// No route found, gotta allocate one
|
||||||
|
let sr_pubkey = match self
|
||||||
|
.allocate_route_inner(
|
||||||
|
inner,
|
||||||
|
rti,
|
||||||
|
safety_spec.stability,
|
||||||
|
safety_spec.sequencing,
|
||||||
|
safety_spec.hop_count,
|
||||||
|
direction,
|
||||||
|
avoid_node_ids,
|
||||||
|
)
|
||||||
|
.map_err(RPCError::internal)?
|
||||||
|
{
|
||||||
|
Some(pk) => pk,
|
||||||
|
None => return Ok(None),
|
||||||
|
};
|
||||||
|
sr_pubkey
|
||||||
|
};
|
||||||
|
Ok(Some(sr_pubkey))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get a private sroute to use for the answer to question
|
||||||
|
pub fn get_private_route_for_safety_spec(
|
||||||
|
&self,
|
||||||
|
safety_spec: &SafetySpec,
|
||||||
|
avoid_node_ids: &[DHTKey],
|
||||||
|
) -> EyreResult<Option<DHTKey>> {
|
||||||
|
let inner = &mut *self.inner.lock();
|
||||||
|
let routing_table = self.unlocked_inner.routing_table.clone();
|
||||||
|
let rti = &*routing_table.inner.read();
|
||||||
|
|
||||||
|
Ok(self.get_route_for_safety_spec_inner(
|
||||||
|
inner,
|
||||||
|
rti,
|
||||||
|
safety_spec,
|
||||||
|
Direction::Inbound.into(),
|
||||||
|
avoid_node_ids,
|
||||||
|
)?)
|
||||||
|
}
|
||||||
|
|
||||||
/// Assemble private route for publication
|
/// Assemble private route for publication
|
||||||
pub fn assemble_private_route(
|
pub fn assemble_private_route(
|
||||||
&self,
|
&self,
|
||||||
@ -1127,6 +1184,114 @@ impl RouteSpecStore {
|
|||||||
Ok(private_route)
|
Ok(private_route)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// get or create a remote private route cache entry
|
||||||
|
fn with_create_remote_private_route<F, R>(
|
||||||
|
inner: &mut RouteSpecStoreInner,
|
||||||
|
cur_ts: u64,
|
||||||
|
key: &DHTKey,
|
||||||
|
f: F,
|
||||||
|
) -> R
|
||||||
|
where
|
||||||
|
F: FnOnce(&mut RemotePrivateRouteInfo) -> R,
|
||||||
|
{
|
||||||
|
let rpr = inner
|
||||||
|
.cache
|
||||||
|
.remote_private_route_cache
|
||||||
|
.entry(*key)
|
||||||
|
.and_modify(|rpr| {
|
||||||
|
if cur_ts - rpr.modified_ts >= REMOTE_PRIVATE_ROUTE_CACHE_EXPIRY {
|
||||||
|
*rpr = RemotePrivateRouteInfo {
|
||||||
|
modified_ts: cur_ts,
|
||||||
|
seen_our_node_info: false,
|
||||||
|
last_replied_ts: None,
|
||||||
|
last_used_ts: None,
|
||||||
|
};
|
||||||
|
} else {
|
||||||
|
rpr.modified_ts = cur_ts;
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.or_insert_with(|| RemotePrivateRouteInfo {
|
||||||
|
modified_ts: cur_ts,
|
||||||
|
seen_our_node_info: false,
|
||||||
|
last_replied_ts: None,
|
||||||
|
last_used_ts: None,
|
||||||
|
});
|
||||||
|
f(rpr)
|
||||||
|
}
|
||||||
|
|
||||||
|
// get a remote private route cache entry
|
||||||
|
fn with_get_remote_private_route<F, R>(
|
||||||
|
inner: &mut RouteSpecStoreInner,
|
||||||
|
cur_ts: u64,
|
||||||
|
key: &DHTKey,
|
||||||
|
f: F,
|
||||||
|
) -> Option<R>
|
||||||
|
where
|
||||||
|
F: FnOnce(&RemotePrivateRouteInfo) -> R,
|
||||||
|
{
|
||||||
|
let rpr = inner.cache.remote_private_route_cache.get(key)?;
|
||||||
|
if cur_ts - rpr.modified_ts < REMOTE_PRIVATE_ROUTE_CACHE_EXPIRY {
|
||||||
|
return Some(f(rpr));
|
||||||
|
}
|
||||||
|
inner.cache.remote_private_route_cache.remove(key);
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Check to see if this remote (not ours) private route has seen our node info yet
|
||||||
|
/// This returns true if we have sent non-safety-route node info to the
|
||||||
|
/// private route and gotten a response before
|
||||||
|
pub fn has_remote_private_route_seen_our_node_info(&self, key: &DHTKey) -> bool {
|
||||||
|
let inner = &mut *self.inner.lock();
|
||||||
|
let cur_ts = intf::get_timestamp();
|
||||||
|
Self::with_get_remote_private_route(inner, cur_ts, key, |rpr| rpr.seen_our_node_info)
|
||||||
|
.unwrap_or_default()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Mark a remote private route as having seen our node info {
|
||||||
|
pub fn mark_remote_private_route_seen_our_node_info(&self, key: &DHTKey) {
|
||||||
|
let inner = &mut *self.inner.lock();
|
||||||
|
let cur_ts = intf::get_timestamp();
|
||||||
|
Self::with_create_remote_private_route(inner, cur_ts, key, |rpr| {
|
||||||
|
rpr.seen_our_node_info = true;
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Mark a remote private route as having replied to a question {
|
||||||
|
pub fn mark_remote_private_route_replied(&self, key: &DHTKey) {
|
||||||
|
let inner = &mut *self.inner.lock();
|
||||||
|
let cur_ts = intf::get_timestamp();
|
||||||
|
Self::with_create_remote_private_route(inner, cur_ts, key, |rpr| {
|
||||||
|
rpr.last_replied_ts = Some(cur_ts);
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Mark a remote private route as having beed used {
|
||||||
|
pub fn mark_remote_private_route_used(&self, key: &DHTKey) {
|
||||||
|
let inner = &mut *self.inner.lock();
|
||||||
|
let cur_ts = intf::get_timestamp();
|
||||||
|
Self::with_create_remote_private_route(inner, cur_ts, key, |rpr| {
|
||||||
|
rpr.last_used_ts = Some(cur_ts);
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Clear caches when local our local node info changes
|
||||||
|
pub fn local_node_info_changed(&self) {
|
||||||
|
let inner = &mut *self.inner.lock();
|
||||||
|
|
||||||
|
// Clean up local allocated routes
|
||||||
|
for (_k, v) in &mut inner.content.details {
|
||||||
|
// Must republish route now
|
||||||
|
v.published = false;
|
||||||
|
// Route is not known reachable now
|
||||||
|
v.reachable = false;
|
||||||
|
// We have yet to check it since local node info changed
|
||||||
|
v.last_checked_ts = None;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clean up remote private routes
|
||||||
|
inner.cache.remote_private_route_cache.clear();
|
||||||
|
}
|
||||||
|
|
||||||
/// Mark route as published
|
/// Mark route as published
|
||||||
/// When first deserialized, routes must be re-published in order to ensure they remain
|
/// When first deserialized, routes must be re-published in order to ensure they remain
|
||||||
/// in the RouteSpecStore.
|
/// in the RouteSpecStore.
|
||||||
|
@ -141,3 +141,184 @@ impl fmt::Display for Destination {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl RPCProcessor {
|
||||||
|
/// Convert the 'Destination' into a 'RespondTo' for a response
|
||||||
|
pub(super) fn get_destination_respond_to(
|
||||||
|
&self,
|
||||||
|
dest: &Destination,
|
||||||
|
) -> Result<NetworkResult<RespondTo>, RPCError> {
|
||||||
|
let routing_table = self.routing_table();
|
||||||
|
let rss = routing_table.route_spec_store();
|
||||||
|
|
||||||
|
match dest {
|
||||||
|
Destination::Direct {
|
||||||
|
target,
|
||||||
|
safety_selection,
|
||||||
|
} => match safety_selection {
|
||||||
|
SafetySelection::Unsafe(_) => {
|
||||||
|
// Sent directly with no safety route, can respond directly
|
||||||
|
Ok(NetworkResult::value(RespondTo::Sender))
|
||||||
|
}
|
||||||
|
SafetySelection::Safe(safety_spec) => {
|
||||||
|
// Sent directly but with a safety route, respond to private route
|
||||||
|
let Some(pr_key) = rss
|
||||||
|
.get_private_route_for_safety_spec(safety_spec, &[target.node_id()])
|
||||||
|
.map_err(RPCError::internal)? else {
|
||||||
|
return Ok(NetworkResult::no_connection_other("no private route for response at this time"));
|
||||||
|
};
|
||||||
|
|
||||||
|
// Get the assembled route for response
|
||||||
|
let private_route = rss
|
||||||
|
.assemble_private_route(&pr_key, None)
|
||||||
|
.map_err(RPCError::internal)?;
|
||||||
|
|
||||||
|
Ok(NetworkResult::Value(RespondTo::PrivateRoute(private_route)))
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Destination::Relay {
|
||||||
|
relay,
|
||||||
|
target,
|
||||||
|
safety_selection,
|
||||||
|
} => match safety_selection {
|
||||||
|
SafetySelection::Unsafe(_) => {
|
||||||
|
// Sent via a relay with no safety route, can respond directly
|
||||||
|
Ok(NetworkResult::value(RespondTo::Sender))
|
||||||
|
}
|
||||||
|
SafetySelection::Safe(safety_spec) => {
|
||||||
|
// Sent via a relay but with a safety route, respond to private route
|
||||||
|
let Some(pr_key) = rss
|
||||||
|
.get_private_route_for_safety_spec(safety_spec, &[relay.node_id(), *target])
|
||||||
|
.map_err(RPCError::internal)? else {
|
||||||
|
return Ok(NetworkResult::no_connection_other("no private route for response at this time"));
|
||||||
|
};
|
||||||
|
|
||||||
|
// Get the assembled route for response
|
||||||
|
let private_route = rss
|
||||||
|
.assemble_private_route(&pr_key, None)
|
||||||
|
.map_err(RPCError::internal)?;
|
||||||
|
|
||||||
|
Ok(NetworkResult::Value(RespondTo::PrivateRoute(private_route)))
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Destination::PrivateRoute {
|
||||||
|
private_route,
|
||||||
|
safety_selection,
|
||||||
|
} => {
|
||||||
|
let Some(pr_first_hop) = &private_route.first_hop else {
|
||||||
|
return Err(RPCError::internal("destination private route must have first_hop"));
|
||||||
|
};
|
||||||
|
|
||||||
|
match safety_selection {
|
||||||
|
SafetySelection::Unsafe(_) => {
|
||||||
|
// Sent to a private route with no safety route, use a stub safety route for the response
|
||||||
|
|
||||||
|
// Determine if we can use optimized nodeinfo
|
||||||
|
let route_node = match rss
|
||||||
|
.has_remote_private_route_seen_our_node_info(&private_route.public_key)
|
||||||
|
{
|
||||||
|
true => RouteNode::NodeId(NodeId::new(routing_table.node_id())),
|
||||||
|
false => RouteNode::PeerInfo(
|
||||||
|
routing_table.get_own_peer_info(RoutingDomain::PublicInternet),
|
||||||
|
),
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(NetworkResult::value(RespondTo::PrivateRoute(
|
||||||
|
PrivateRoute::new_stub(routing_table.node_id(), route_node),
|
||||||
|
)))
|
||||||
|
}
|
||||||
|
SafetySelection::Safe(safety_spec) => {
|
||||||
|
// Sent directly but with a safety route, respond to private route
|
||||||
|
let avoid_node_id = match &pr_first_hop.node {
|
||||||
|
RouteNode::NodeId(n) => n.key,
|
||||||
|
RouteNode::PeerInfo(p) => p.node_id.key,
|
||||||
|
};
|
||||||
|
|
||||||
|
let Some(pr_key) = rss
|
||||||
|
.get_private_route_for_safety_spec(safety_spec, &[avoid_node_id])
|
||||||
|
.map_err(RPCError::internal)? else {
|
||||||
|
return Ok(NetworkResult::no_connection_other("no private route for response at this time"));
|
||||||
|
};
|
||||||
|
|
||||||
|
// Get the assembled route for response
|
||||||
|
let private_route = rss
|
||||||
|
.assemble_private_route(&pr_key, None)
|
||||||
|
.map_err(RPCError::internal)?;
|
||||||
|
|
||||||
|
Ok(NetworkResult::Value(RespondTo::PrivateRoute(private_route)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Convert the 'RespondTo' into a 'Destination' for a response
|
||||||
|
pub(super) fn get_respond_to_destination(
|
||||||
|
&self,
|
||||||
|
request: &RPCMessage,
|
||||||
|
) -> NetworkResult<Destination> {
|
||||||
|
// Get the question 'respond to'
|
||||||
|
let respond_to = match request.operation.kind() {
|
||||||
|
RPCOperationKind::Question(q) => q.respond_to(),
|
||||||
|
_ => {
|
||||||
|
panic!("not a question");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// To where should we respond?
|
||||||
|
match respond_to {
|
||||||
|
RespondTo::Sender => {
|
||||||
|
// Parse out the header detail from the question
|
||||||
|
let detail = match &request.header.detail {
|
||||||
|
RPCMessageHeaderDetail::Direct(detail) => detail,
|
||||||
|
RPCMessageHeaderDetail::SafetyRouted(_)
|
||||||
|
| RPCMessageHeaderDetail::PrivateRouted(_) => {
|
||||||
|
// If this was sent via a private route, we don't know what the sender was, so drop this
|
||||||
|
return NetworkResult::invalid_message(
|
||||||
|
"can't respond directly to non-direct question",
|
||||||
|
);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Reply directly to the request's source
|
||||||
|
let sender_id = detail.envelope.get_sender_id();
|
||||||
|
|
||||||
|
// This may be a different node's reference than the 'sender' in the case of a relay
|
||||||
|
let peer_noderef = detail.peer_noderef.clone();
|
||||||
|
|
||||||
|
// If the sender_id is that of the peer, then this is a direct reply
|
||||||
|
// else it is a relayed reply through the peer
|
||||||
|
if peer_noderef.node_id() == sender_id {
|
||||||
|
NetworkResult::value(Destination::direct(peer_noderef))
|
||||||
|
} else {
|
||||||
|
NetworkResult::value(Destination::relay(peer_noderef, sender_id))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
RespondTo::PrivateRoute(pr) => {
|
||||||
|
match &request.header.detail {
|
||||||
|
RPCMessageHeaderDetail::Direct(_) => {
|
||||||
|
// If this was sent directly, we should only ever respond directly
|
||||||
|
return NetworkResult::invalid_message(
|
||||||
|
"not responding to private route from direct question",
|
||||||
|
);
|
||||||
|
}
|
||||||
|
RPCMessageHeaderDetail::SafetyRouted(detail) => {
|
||||||
|
// If this was sent via a safety route, but not received over our private route, don't respond with a safety route,
|
||||||
|
// it would give away which safety routes belong to this node
|
||||||
|
NetworkResult::value(Destination::private_route(
|
||||||
|
pr.clone(),
|
||||||
|
SafetySelection::Unsafe(detail.sequencing),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
RPCMessageHeaderDetail::PrivateRouted(detail) => {
|
||||||
|
// If this was received over our private route, it's okay to respond to a private route via our safety route
|
||||||
|
NetworkResult::value(Destination::private_route(
|
||||||
|
pr.clone(),
|
||||||
|
SafetySelection::Safe(detail.safety_spec.clone()),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -792,73 +792,6 @@ impl RPCProcessor {
|
|||||||
Ok(NetworkResult::value(()))
|
Ok(NetworkResult::value(()))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Convert the 'RespondTo' into a 'Destination' for a response
|
|
||||||
fn get_respond_to_destination(&self, request: &RPCMessage) -> NetworkResult<Destination> {
|
|
||||||
// Get the question 'respond to'
|
|
||||||
let respond_to = match request.operation.kind() {
|
|
||||||
RPCOperationKind::Question(q) => q.respond_to(),
|
|
||||||
_ => {
|
|
||||||
panic!("not a question");
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// To where should we respond?
|
|
||||||
match respond_to {
|
|
||||||
RespondTo::Sender => {
|
|
||||||
// Parse out the header detail from the question
|
|
||||||
let detail = match &request.header.detail {
|
|
||||||
RPCMessageHeaderDetail::Direct(detail) => detail,
|
|
||||||
RPCMessageHeaderDetail::SafetyRouted(_)
|
|
||||||
| RPCMessageHeaderDetail::PrivateRouted(_) => {
|
|
||||||
// If this was sent via a private route, we don't know what the sender was, so drop this
|
|
||||||
return NetworkResult::invalid_message(
|
|
||||||
"can't respond directly to non-direct question",
|
|
||||||
);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// Reply directly to the request's source
|
|
||||||
let sender_id = detail.envelope.get_sender_id();
|
|
||||||
|
|
||||||
// This may be a different node's reference than the 'sender' in the case of a relay
|
|
||||||
let peer_noderef = detail.peer_noderef.clone();
|
|
||||||
|
|
||||||
// If the sender_id is that of the peer, then this is a direct reply
|
|
||||||
// else it is a relayed reply through the peer
|
|
||||||
if peer_noderef.node_id() == sender_id {
|
|
||||||
NetworkResult::value(Destination::direct(peer_noderef))
|
|
||||||
} else {
|
|
||||||
NetworkResult::value(Destination::relay(peer_noderef, sender_id))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
RespondTo::PrivateRoute(pr) => {
|
|
||||||
match &request.header.detail {
|
|
||||||
RPCMessageHeaderDetail::Direct(_) => {
|
|
||||||
// If this was sent directly, we should only ever respond directly
|
|
||||||
return NetworkResult::invalid_message(
|
|
||||||
"not responding to private route from direct question",
|
|
||||||
);
|
|
||||||
}
|
|
||||||
RPCMessageHeaderDetail::SafetyRouted(detail) => {
|
|
||||||
// If this was sent via a safety route, but not received over our private route, don't respond with a safety route,
|
|
||||||
// it would give away which safety routes belong to this node
|
|
||||||
NetworkResult::value(Destination::private_route(
|
|
||||||
pr.clone(),
|
|
||||||
SafetySelection::Unsafe(detail.sequencing),
|
|
||||||
))
|
|
||||||
}
|
|
||||||
RPCMessageHeaderDetail::PrivateRouted(detail) => {
|
|
||||||
// If this was received over our private route, it's okay to respond to a private route via our safety route
|
|
||||||
NetworkResult::value(Destination::private_route(
|
|
||||||
pr.clone(),
|
|
||||||
SafetySelection::Safe(detail.safety_spec.clone()),
|
|
||||||
))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Issue a reply over the network, possibly using an anonymized route
|
// Issue a reply over the network, possibly using an anonymized route
|
||||||
// The request must want a response, or this routine fails
|
// The request must want a response, or this routine fails
|
||||||
#[instrument(level = "debug", skip(self, request, answer), err)]
|
#[instrument(level = "debug", skip(self, request, answer), err)]
|
||||||
|
@ -1,53 +0,0 @@
|
|||||||
use super::*;
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
pub enum Origin {
|
|
||||||
Sender,
|
|
||||||
PrivateRoute(PrivateRoute),
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Origin {
|
|
||||||
pub fn sender() -> Self {
|
|
||||||
Self::Sender
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn private_route(private_route: PrivateRoute) -> Self {
|
|
||||||
Self::PrivateRoute(private_route)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn into_respond_to(self, destination: &Destination) -> Result<RespondTo, RPCError> {
|
|
||||||
match self {
|
|
||||||
Self::Sender => {
|
|
||||||
let peer = match destination {
|
|
||||||
Destination::Direct {
|
|
||||||
target,
|
|
||||||
safety_route_spec,
|
|
||||||
} => todo!(),
|
|
||||||
Destination::Relay {
|
|
||||||
relay,
|
|
||||||
target,
|
|
||||||
safety_route_spec,
|
|
||||||
} => todo!(),
|
|
||||||
Destination::PrivateRoute {
|
|
||||||
private_route,
|
|
||||||
safety_route_spec,
|
|
||||||
} => todo!(),
|
|
||||||
};
|
|
||||||
let routing_table = peer.routing_table();
|
|
||||||
let routing_domain = peer.best_routing_domain();
|
|
||||||
// Send some signed node info along with the question if this node needs to be replied to
|
|
||||||
if routing_table.has_valid_own_node_info()
|
|
||||||
&& !peer.has_seen_our_node_info(routing_domain)
|
|
||||||
{
|
|
||||||
let our_sni = self
|
|
||||||
.routing_table()
|
|
||||||
.get_own_signed_node_info(routing_domain);
|
|
||||||
RespondTo::Sender(Some(our_sni))
|
|
||||||
} else {
|
|
||||||
RespondTo::Sender(None)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Self::PrivateRoute(pr) => RespondTo::PrivateRoute(pr),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -10,7 +10,10 @@ impl RPCProcessor {
|
|||||||
message: Vec<u8>,
|
message: Vec<u8>,
|
||||||
) -> Result<NetworkResult<Answer<Vec<u8>>>, RPCError> {
|
) -> Result<NetworkResult<Answer<Vec<u8>>>, RPCError> {
|
||||||
let app_call_q = RPCOperationAppCallQ { message };
|
let app_call_q = RPCOperationAppCallQ { message };
|
||||||
let question = RPCQuestion::new(RespondTo::Sender, RPCQuestionDetail::AppCallQ(app_call_q));
|
let question = RPCQuestion::new(
|
||||||
|
network_result_try!(self.get_destination_respond_to(&dest)?),
|
||||||
|
RPCQuestionDetail::AppCallQ(app_call_q),
|
||||||
|
);
|
||||||
|
|
||||||
// Send the app call question
|
// Send the app call question
|
||||||
let waitable_reply = network_result_try!(self.question(dest, question).await?);
|
let waitable_reply = network_result_try!(self.question(dest, question).await?);
|
||||||
|
@ -28,7 +28,10 @@ impl RPCProcessor {
|
|||||||
|
|
||||||
let find_node_q_detail =
|
let find_node_q_detail =
|
||||||
RPCQuestionDetail::FindNodeQ(RPCOperationFindNodeQ { node_id: key });
|
RPCQuestionDetail::FindNodeQ(RPCOperationFindNodeQ { node_id: key });
|
||||||
let find_node_q = RPCQuestion::new(RespondTo::Sender, find_node_q_detail);
|
let find_node_q = RPCQuestion::new(
|
||||||
|
network_result_try!(self.get_destination_respond_to(&dest)?),
|
||||||
|
find_node_q_detail,
|
||||||
|
);
|
||||||
|
|
||||||
// Send the find_node request
|
// Send the find_node request
|
||||||
let waitable_reply = network_result_try!(self.question(dest, find_node_q).await?);
|
let waitable_reply = network_result_try!(self.question(dest, find_node_q).await?);
|
||||||
|
@ -70,7 +70,10 @@ impl RPCProcessor {
|
|||||||
};
|
};
|
||||||
|
|
||||||
let status_q = RPCOperationStatusQ { node_status };
|
let status_q = RPCOperationStatusQ { node_status };
|
||||||
let question = RPCQuestion::new(RespondTo::Sender, RPCQuestionDetail::StatusQ(status_q));
|
let question = RPCQuestion::new(
|
||||||
|
network_result_try!(self.get_destination_respond_to(&dest)?),
|
||||||
|
RPCQuestionDetail::StatusQ(status_q),
|
||||||
|
);
|
||||||
|
|
||||||
// Send the info request
|
// Send the info request
|
||||||
let waitable_reply = network_result_try!(self.question(dest.clone(), question).await?);
|
let waitable_reply = network_result_try!(self.question(dest.clone(), question).await?);
|
||||||
|
@ -582,7 +582,6 @@ impl VeilidAPI {
|
|||||||
let mut stability = Stability::LowLatency;
|
let mut stability = Stability::LowLatency;
|
||||||
let mut hop_count = default_route_hop_count;
|
let mut hop_count = default_route_hop_count;
|
||||||
let mut directions = DirectionSet::all();
|
let mut directions = DirectionSet::all();
|
||||||
let mut avoid_node_id = None;
|
|
||||||
|
|
||||||
while ai < args.len() {
|
while ai < args.len() {
|
||||||
if let Ok(seq) =
|
if let Ok(seq) =
|
||||||
@ -601,10 +600,6 @@ impl VeilidAPI {
|
|||||||
get_debug_argument_at(&args, ai, "debug_route", "direction_set", get_direction_set)
|
get_debug_argument_at(&args, ai, "debug_route", "direction_set", get_direction_set)
|
||||||
{
|
{
|
||||||
directions = ds;
|
directions = ds;
|
||||||
} else if let Ok(ani) =
|
|
||||||
get_debug_argument_at(&args, ai, "debug_route", "avoid_node_id", get_dht_key)
|
|
||||||
{
|
|
||||||
avoid_node_id = Some(ani);
|
|
||||||
} else {
|
} else {
|
||||||
return Ok(format!("Invalid argument specified: {}", args[ai]));
|
return Ok(format!("Invalid argument specified: {}", args[ai]));
|
||||||
}
|
}
|
||||||
@ -612,8 +607,7 @@ impl VeilidAPI {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Allocate route
|
// Allocate route
|
||||||
let out =
|
let out = match rss.allocate_route(stability, sequencing, hop_count, directions, &[]) {
|
||||||
match rss.allocate_route(stability, sequencing, hop_count, directions, avoid_node_id) {
|
|
||||||
Ok(Some(v)) => format!("{}", v.encode()),
|
Ok(Some(v)) => format!("{}", v.encode()),
|
||||||
Ok(None) => format!("<unavailable>"),
|
Ok(None) => format!("<unavailable>"),
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
@ -608,7 +608,7 @@ pub enum SafetySelection {
|
|||||||
pub struct SafetySpec {
|
pub struct SafetySpec {
|
||||||
/// preferred safety route if it still exists
|
/// preferred safety route if it still exists
|
||||||
pub preferred_route: Option<DHTKey>,
|
pub preferred_route: Option<DHTKey>,
|
||||||
/// 0 = no safety route, just use node's node id, more hops is safer but slower
|
/// must be greater than 0
|
||||||
pub hop_count: usize,
|
pub hop_count: usize,
|
||||||
/// prefer reliability over speed
|
/// prefer reliability over speed
|
||||||
pub stability: Stability,
|
pub stability: Stability,
|
||||||
|
Loading…
Reference in New Issue
Block a user