routing work

This commit is contained in:
John Smith 2023-02-23 21:07:46 -05:00
parent ed8703e507
commit 4823c979ab
8 changed files with 288 additions and 191 deletions

View File

@ -263,7 +263,7 @@ struct PeerInfo @0xfe2d722d5d3c4bcb {
struct RoutedOperation @0xcbcb8535b839e9dd { struct RoutedOperation @0xcbcb8535b839e9dd {
sequencing @0 :Sequencing; # sequencing preference to use to pass the message along sequencing @0 :Sequencing; # sequencing preference to use to pass the message along
signatures @1 :List(TypedSignature); # signatures from nodes that have handled the private route signatures @1 :List(Signature); # signatures from nodes that have handled the private route
nonce @2 :Nonce; # nonce Xmsg nonce @2 :Nonce; # nonce Xmsg
data @3 :Data; # operation encrypted with ENC(Xmsg,DH(PKapr,SKbsr)) data @3 :Data; # operation encrypted with ENC(Xmsg,DH(PKapr,SKbsr))
} }

View File

@ -33,7 +33,7 @@ pub trait CryptoSystem {
fn validate_hash_reader( fn validate_hash_reader(
&self, &self,
reader: &mut dyn std::io::Read, reader: &mut dyn std::io::Read,
dht_key: &PublicKey, key: &PublicKey,
) -> Result<bool, VeilidAPIError>; ) -> Result<bool, VeilidAPIError>;
// Distance Metric // Distance Metric
@ -42,13 +42,13 @@ pub trait CryptoSystem {
// Authentication // Authentication
fn sign( fn sign(
&self, &self,
dht_key: &PublicKey, key: &PublicKey,
dht_key_secret: &SecretKey, secret: &SecretKey,
data: &[u8], data: &[u8],
) -> Result<Signature, VeilidAPIError>; ) -> Result<Signature, VeilidAPIError>;
fn verify( fn verify(
&self, &self,
dht_key: &PublicKey, key: &PublicKey,
data: &[u8], data: &[u8],
signature: &Signature, signature: &Signature,
) -> Result<(), VeilidAPIError>; ) -> Result<(), VeilidAPIError>;

View File

@ -40,6 +40,6 @@ pub fn bytes_to_cache(bytes: &[u8], cache: &mut DHCache) {
let v = DHCacheValue { let v = DHCacheValue {
shared_secret: SharedSecret::new(d[64..96].try_into().expect("asdf")), shared_secret: SharedSecret::new(d[64..96].try_into().expect("asdf")),
}; };
cache.insert(k, v); cache.insert(k, v, |_k, _v| {});
} }
} }

View File

@ -269,10 +269,13 @@ impl Crypto {
secret: &SecretKey, secret: &SecretKey,
) -> Result<SharedSecret, VeilidAPIError> { ) -> Result<SharedSecret, VeilidAPIError> {
Ok( Ok(
match self.inner.lock().dh_cache.entry(DHCacheKey { match self.inner.lock().dh_cache.entry(
DHCacheKey {
key: *key, key: *key,
secret: *secret, secret: *secret,
}) { },
|_k, _v| {},
) {
Entry::Occupied(e) => e.get().shared_secret, Entry::Occupied(e) => e.get().shared_secret,
Entry::Vacant(e) => { Entry::Vacant(e) => {
let shared_secret = vcrypto.compute_dh(key, secret)?; let shared_secret = vcrypto.compute_dh(key, secret)?;

View File

@ -136,7 +136,9 @@ impl ConnectionTable {
}; };
// Add the connection to the table // Add the connection to the table
let res = inner.conn_by_id[protocol_index].insert(id, network_connection); let res = inner.conn_by_id[protocol_index].insert(id, network_connection, |_k, _v| {
// never lrus, unbounded
});
assert!(res.is_none()); assert!(res.is_none());
// if we have reached the maximum number of connections per protocol type // if we have reached the maximum number of connections per protocol type

View File

@ -164,12 +164,19 @@ impl RouteStats {
#[archive_attr(repr(C), derive(CheckBytes))] #[archive_attr(repr(C), derive(CheckBytes))]
pub struct RouteSpecDetail { pub struct RouteSpecDetail {
/// Crypto kind /// Crypto kind
crypto_kind: CryptoKind, pub crypto_kind: CryptoKind,
/// Secret key /// Secret key
#[with(Skip)] #[with(Skip)]
secret_key: SecretKey, pub secret_key: SecretKey,
/// Route hops (node id keys) /// Route hops (node id keys)
hops: Vec<PublicKey>, pub hops: Vec<PublicKey>,
}
#[derive(Clone, Debug, RkyvArchive, RkyvSerialize, RkyvDeserialize)]
#[archive_attr(repr(C), derive(CheckBytes))]
pub struct RouteSetSpecDetail {
/// Route set per crypto kind
route_set: BTreeMap<PublicKey, RouteSpecDetail>,
/// Route noderefs /// Route noderefs
#[with(Skip)] #[with(Skip)]
hop_node_refs: Vec<NodeRef>, hop_node_refs: Vec<NodeRef>,
@ -188,9 +195,19 @@ pub struct RouteSpecDetail {
stats: RouteStats, stats: RouteStats,
} }
impl RouteSpecDetail { impl RouteSetSpecDetail {
pub fn get_crypto_kind(&self) -> CryptoKind { pub fn get_route_by_key(&self, key: PublicKey) -> Option<&RouteSpecDetail> {
self.crypto_kind self.route_set.get(&key)
}
pub fn get_route_by_key_mut(&mut self, key: PublicKey) -> Option<&mut RouteSpecDetail> {
self.route_set.get_mut(&key)
}
pub fn get_route_set_keys(&self) -> TypedKeySet {
let mut tks = TypedKeySet::new();
for (k, v) in &self.route_set {
tks.add(TypedKey::new(v.crypto_kind, *k));
}
tks
} }
pub fn get_stats(&self) -> &RouteStats { pub fn get_stats(&self) -> &RouteStats {
&self.stats &self.stats
@ -202,10 +219,7 @@ impl RouteSpecDetail {
self.published self.published
} }
pub fn hop_count(&self) -> usize { pub fn hop_count(&self) -> usize {
self.hops.len() self.hop_node_refs.len()
}
pub fn get_secret_key(&self) -> SecretKey {
self.secret_key
} }
pub fn get_stability(&self) -> Stability { pub fn get_stability(&self) -> Stability {
self.stability self.stability
@ -225,15 +239,60 @@ impl RouteSpecDetail {
#[derive(Debug, Clone, Default, RkyvArchive, RkyvSerialize, RkyvDeserialize)] #[derive(Debug, Clone, Default, RkyvArchive, RkyvSerialize, RkyvDeserialize)]
#[archive_attr(repr(C, align(8)), derive(CheckBytes))] #[archive_attr(repr(C, align(8)), derive(CheckBytes))]
pub struct RouteSpecStoreContent { pub struct RouteSpecStoreContent {
/// All of the routes we have allocated so far /// All of the route sets we have allocated so far indexed by key
details: HashMap<PublicKey, RouteSpecDetail>, id_by_key: HashMap<PublicKey, String>,
/// All of the route sets we have allocated so far
details: HashMap<String, RouteSetSpecDetail>,
}
impl RouteSpecStoreContent {
pub fn add_detail(&mut self, detail: RouteSetSpecDetail) -> String {
// generate unique key string
let mut idbytes = [0u8; 16];
for (pk, _) in &detail.route_set {
for (i, x) in pk.bytes.iter().enumerate() {
idbytes[i % 16] ^= *x;
}
}
let id = format!("{:08x}-{:04x}-{:04x}-{:04x}-{:08x}{:04x}",
u32::from_be_bytes(idbytes[0..4].try_into().expect("32 bits")),
u16::from_be_bytes(idbytes[4..6].try_into().expect("16 bits")),
u16::from_be_bytes(idbytes[6..8].try_into().expect("16 bits")),
u16::from_be_bytes(idbytes[8..10].try_into().expect("16 bits")),
u32::from_be_bytes(idbytes[10..14].try_into().expect("32 bits")),
u16::from_be_bytes(idbytes[14..16].try_into().expect("16 bits")));
// also store in id by key table
for (pk, _) in &detail.route_set {
self.id_by_key.insert(*pk, id.clone());
}
self.details.insert(id.clone(), detail);
id
}
pub fn remove_detail(&mut self, id: &String) {
let detail = self.details.remove(id).unwrap();
for (pk, _) in &detail.route_set {
self.id_by_key.remove(&pk).unwrap();
}
}
pub fn get_detail(&self, id: &String) -> Option<&RouteSetSpecDetail> {
self.details.get(id)
}
pub fn get_detail_mut(&mut self, id: &String) -> Option<&mut RouteSetSpecDetail> {
self.details.get_mut(id)
}
pub fn get_id_by_key(&self, key: &PublicKey) -> Option<String> {
self.id_by_key.get(key).cloned()
}
} }
/// What remote private routes have seen /// What remote private routes have seen
#[derive(Debug, Clone, Default)] #[derive(Debug, Clone, Default)]
pub struct RemotePrivateRouteInfo { pub struct RemotePrivateRouteInfo {
/// The private route itself /// The private routes themselves
private_route: Option<PrivateRoute>, private_routes: HashMap<PublicKey, PrivateRoute>,
/// Did this remote private route see our node info due to no safety route in use /// Did this remote private route see our node info due to no safety route in use
last_seen_our_node_info_ts: Timestamp, last_seen_our_node_info_ts: Timestamp,
/// Last time this remote private route was requested for any reason (cache expiration) /// Last time this remote private route was requested for any reason (cache expiration)
@ -260,8 +319,10 @@ pub struct RouteSpecStoreCache {
used_end_nodes: HashMap<TypedKey, usize>, used_end_nodes: HashMap<TypedKey, usize>,
/// Route spec hop cache, used to quickly disqualify routes /// Route spec hop cache, used to quickly disqualify routes
hop_cache: HashSet<Vec<u8>>, hop_cache: HashSet<Vec<u8>>,
/// Has a remote private route responded to a question and when /// Remote private routes we've imported and statistics
remote_private_route_cache: LruCache<PublicKey, RemotePrivateRouteInfo>, remote_private_route_set_cache: LruCache<String, RemotePrivateRouteInfo>,
/// Remote private routes indexed by public key
remote_private_routes_by_key: HashMap<PublicKey, String>,
/// Compiled route cache /// Compiled route cache
compiled_route_cache: LruCache<CompiledRouteCacheKey, SafetyRoute>, compiled_route_cache: LruCache<CompiledRouteCacheKey, SafetyRoute>,
/// List of dead allocated routes /// List of dead allocated routes
@ -270,13 +331,35 @@ pub struct RouteSpecStoreCache {
dead_remote_routes: Vec<PublicKey>, dead_remote_routes: Vec<PublicKey>,
} }
impl RouteSpecStoreCache {
pub fn get_used_node_count(&self, node_ids: &TypedKeySet) -> usize {
node_ids.iter().fold(0usize, |acc, k| {
acc + self
.used_nodes
.get(&k)
.cloned()
.unwrap_or_default()
})
}
pub fn get_used_end_node_count(&self, node_ids: &TypedKeySet) -> usize {
node_ids.iter().fold(0usize, |acc, k| {
acc + self
.used_end_nodes
.get(&k)
.cloned()
.unwrap_or_default()
})
}
}
impl Default for RouteSpecStoreCache { impl Default for RouteSpecStoreCache {
fn default() -> Self { fn default() -> Self {
Self { Self {
used_nodes: Default::default(), used_nodes: Default::default(),
used_end_nodes: Default::default(), used_end_nodes: Default::default(),
hop_cache: Default::default(), hop_cache: Default::default(),
remote_private_route_cache: LruCache::new(REMOTE_PRIVATE_ROUTE_CACHE_SIZE), remote_private_route_set_cache: LruCache::new(REMOTE_PRIVATE_ROUTE_CACHE_SIZE),
remote_private_routes_by_key: HashMap::new(),
compiled_route_cache: LruCache::new(COMPILED_ROUTE_CACHE_SIZE), compiled_route_cache: LruCache::new(COMPILED_ROUTE_CACHE_SIZE),
dead_routes: Default::default(), dead_routes: Default::default(),
dead_remote_routes: Default::default(), dead_remote_routes: Default::default(),
@ -317,19 +400,19 @@ pub struct RouteSpecStore {
unlocked_inner: Arc<RouteSpecStoreUnlockedInner>, unlocked_inner: Arc<RouteSpecStoreUnlockedInner>,
} }
fn route_hops_to_hop_cache(hops: &[PublicKey]) -> Vec<u8> { fn route_hops_to_hop_cache(hops: &[NodeRef]) -> Vec<u8> {
let mut cache: Vec<u8> = Vec::with_capacity(hops.len() * PUBLIC_KEY_LENGTH); let mut cache: Vec<u8> = Vec::with_capacity(hops.len() * PUBLIC_KEY_LENGTH);
for hop in hops { for hop in hops {
cache.extend_from_slice(&hop.bytes); cache.extend_from_slice(&hop.best_node_id().key.bytes);
} }
cache cache
} }
/// get the hop cache key for a particular route permutation /// get the hop cache key for a particular route permutation
fn route_permutation_to_hop_cache(nodes: &[PeerInfo], perm: &[usize]) -> Vec<u8> { fn route_permutation_to_hop_cache(rti: &RoutingTableInner, nodes: &[NodeRef], perm: &[usize]) -> Vec<u8> {
let mut cache: Vec<u8> = Vec::with_capacity(perm.len() * PUBLIC_KEY_LENGTH); let mut cache: Vec<u8> = Vec::with_capacity(perm.len() * PUBLIC_KEY_LENGTH);
for n in perm { for n in perm {
cache.extend_from_slice(&nodes[*n].node_id.key.bytes) cache.extend_from_slice(&nodes[*n].locked(rti).best_node_id().key.bytes)
} }
cache cache
} }
@ -416,6 +499,7 @@ impl RouteSpecStore {
}), }),
inner: Arc::new(Mutex::new(RouteSpecStoreInner { inner: Arc::new(Mutex::new(RouteSpecStoreInner {
content: RouteSpecStoreContent { content: RouteSpecStoreContent {
id_by_key: HashMap::new(),
details: HashMap::new(), details: HashMap::new(),
}, },
cache: Default::default(), cache: Default::default(),
@ -441,39 +525,62 @@ impl RouteSpecStore {
rsstdb.load_rkyv(0, b"content")?.unwrap_or_default(); rsstdb.load_rkyv(0, b"content")?.unwrap_or_default();
// Look up all route hop noderefs since we can't serialize those // Look up all route hop noderefs since we can't serialize those
let mut dead_keys = Vec::new(); let mut dead_ids = Vec::new();
for (k, rsd) in &mut content.details { for (rsid, rssd) in &mut content.details {
// Get first route since they all should resolve
let Some((pk, rsd)) = rssd.route_set.first_key_value() else {
dead_ids.push(rsid.clone());
continue;
};
// Go through first route
for h in &rsd.hops { for h in &rsd.hops {
let Some(nr) = routing_table.lookup_node_ref(TypedKey::new(rsd.crypto_kind, *h)) else { let Some(nr) = routing_table.lookup_node_ref(TypedKey::new(rsd.crypto_kind, *h)) else {
dead_keys.push(*k); dead_ids.push(rsid.clone());
break; break;
}; };
rsd.hop_node_refs.push(nr); rssd.hop_node_refs.push(nr);
} }
} }
for k in dead_keys { for id in dead_ids {
log_rtab!(debug "no entry, killing off private route: {}", k.encode()); log_rtab!(debug "no entry, killing off private route: {}", id);
content.details.remove(&k); content.remove_detail(&id);
} }
// Load secrets from pstore // Load secrets from pstore
let pstore = routing_table.network_manager().protected_store(); let pstore = routing_table.network_manager().protected_store();
let out: Vec<KeyPair> = pstore let secret_key_map: HashMap<PublicKey, SecretKey> = pstore
.load_user_secret_rkyv("RouteSpecStore") .load_user_secret_rkyv("RouteSpecStore")
.await? .await?
.unwrap_or_default(); .unwrap_or_default();
let mut dead_keys = Vec::new(); // Ensure we got secret keys for all the public keys
for KeyPair { key, secret } in out { let mut got_secret_key_ids = HashSet::new();
if let Some(rsd) = content.details.get_mut(&key) { for (rsid, rssd) in &mut content.details {
rsd.secret_key = secret; let mut found_all = true;
for (pk, rsd) in &mut rssd.route_set {
if let Some(sk) = secret_key_map.get(pk) {
rsd.secret_key = *sk;
} else { } else {
dead_keys.push(key); found_all = false;
break;
} }
} }
for k in dead_keys { if found_all {
log_rtab!(debug "killing off private route: {}", k.encode()); got_secret_key_ids.insert(rsid.clone());
content.details.remove(&k); }
}
// If we missed any, nuke those route ids
let dead_ids:Vec<String> = content.details.keys().filter_map(|id| {
if !got_secret_key_ids.contains(id) {
Some(id.clone())
} else {
None
}
}).collect();
for id in dead_ids {
log_rtab!(debug "missing secret key, killing off private route: {}", id);
content.remove_detail(&id);
} }
let mut inner = RouteSpecStoreInner { let mut inner = RouteSpecStoreInner {
@ -504,6 +611,7 @@ impl RouteSpecStore {
}; };
// Save all the fields we care about to the frozen blob in table storage // Save all the fields we care about to the frozen blob in table storage
// This skips #[with(Skip)] saving the secret keys, we save them in the protected store instead
let table_store = self let table_store = self
.unlocked_inner .unlocked_inner
.routing_table .routing_table
@ -519,12 +627,11 @@ impl RouteSpecStore {
.network_manager() .network_manager()
.protected_store(); .protected_store();
let mut out: Vec<KeyPair> = Vec::with_capacity(content.details.len()); let mut out: HashMap<PublicKey, SecretKey> = HashMap::new();
for (k, v) in &content.details { for (rsid, rssd) in &content.details {
out.push(KeyPair { for (pk, rsd) in &rssd.route_set {
key: *k, out.insert(*pk, rsd.secret_key);
secret: v.secret_key, }
});
} }
let _ = pstore.save_user_secret_rkyv("RouteSpecStore", &out).await?; // ignore if this previously existed or not let _ = pstore.save_user_secret_rkyv("RouteSpecStore", &out).await?; // ignore if this previously existed or not
@ -555,44 +662,33 @@ impl RouteSpecStore {
update_callback(update); update_callback(update);
} }
fn add_to_cache(cache: &mut RouteSpecStoreCache, cache_key: Vec<u8>, rsd: &RouteSpecDetail) { fn add_to_cache(cache: &mut RouteSpecStoreCache, cache_key: Vec<u8>, rssd: &RouteSetSpecDetail) {
if !cache.hop_cache.insert(cache_key) { if !cache.hop_cache.insert(cache_key) {
panic!("route should never be inserted twice"); panic!("route should never be inserted twice");
} }
for (pk, rsd) in &rssd.route_set {
for h in &rsd.hops { for h in &rsd.hops {
cache cache
.used_nodes .used_nodes
.entry(*h) .entry(TypedKey::new(rsd.crypto_kind, *h))
.and_modify(|e| *e += 1) .and_modify(|e| *e += 1)
.or_insert(1); .or_insert(1);
} }
cache cache
.used_end_nodes .used_end_nodes
.entry(*rsd.hops.last().unwrap()) .entry(TypedKey::new(rsd.crypto_kind, *rsd.hops.last().unwrap()))
.and_modify(|e| *e += 1) .and_modify(|e| *e += 1)
.or_insert(1); .or_insert(1);
} }
}
fn rebuild_cache(inner: &mut RouteSpecStoreInner) { fn rebuild_cache(inner: &mut RouteSpecStoreInner) {
for v in inner.content.details.values() { for rssd in inner.content.details.values() {
let cache_key = route_hops_to_hop_cache(&v.hops); let cache_key = route_hops_to_hop_cache(&rssd.hop_node_refs);
Self::add_to_cache(&mut inner.cache, cache_key, &v); Self::add_to_cache(&mut inner.cache, cache_key, &rssd);
} }
} }
fn detail<'a>(
inner: &'a RouteSpecStoreInner,
route_spec_key: &PublicKey,
) -> Option<&'a RouteSpecDetail> {
inner.content.details.get(route_spec_key)
}
fn detail_mut<'a>(
inner: &'a mut RouteSpecStoreInner,
route_spec_key: &PublicKey,
) -> Option<&'a mut RouteSpecDetail> {
inner.content.details.get_mut(route_spec_key)
}
/// Purge the route spec store /// Purge the route spec store
pub async fn purge(&self) -> EyreResult<()> { pub async fn purge(&self) -> EyreResult<()> {
{ {
@ -607,7 +703,7 @@ impl RouteSpecStore {
/// Prefers nodes that are not currently in use by another route /// Prefers nodes that are not currently in use by another route
/// The route is not yet tested for its reachability /// The route is not yet tested for its reachability
/// Returns None if no route could be allocated at this time /// Returns None if no route could be allocated at this time
/// Returns Some list of public keys for the requested set of crypto kinds /// Returns Some route id string
#[instrument(level = "trace", skip(self), ret, err)] #[instrument(level = "trace", skip(self), ret, err)]
pub fn allocate_route( pub fn allocate_route(
&self, &self,
@ -617,7 +713,7 @@ impl RouteSpecStore {
hop_count: usize, hop_count: usize,
directions: DirectionSet, directions: DirectionSet,
avoid_nodes: &[TypedKey], avoid_nodes: &[TypedKey],
) -> EyreResult<Option<TypedKeySet>> { ) -> EyreResult<Option<String>> {
let inner = &mut *self.inner.lock(); let inner = &mut *self.inner.lock();
let routing_table = self.unlocked_inner.routing_table.clone(); let routing_table = self.unlocked_inner.routing_table.clone();
let rti = &mut *routing_table.inner.write(); let rti = &mut *routing_table.inner.write();
@ -645,7 +741,7 @@ impl RouteSpecStore {
hop_count: usize, hop_count: usize,
directions: DirectionSet, directions: DirectionSet,
avoid_nodes: &[TypedKey], avoid_nodes: &[TypedKey],
) -> EyreResult<Option<TypedKeySet>> { ) -> EyreResult<Option<String>> {
use core::cmp::Ordering; use core::cmp::Ordering;
if hop_count < 1 { if hop_count < 1 {
@ -666,7 +762,7 @@ impl RouteSpecStore {
// Get list of all nodes, and sort them for selection // Get list of all nodes, and sort them for selection
let cur_ts = get_aligned_timestamp(); let cur_ts = get_aligned_timestamp();
let filter = Box::new( let filter = Box::new(
move |rti: &RoutingTableInner, entry: Option<Arc<BucketEntry>>| -> bool { |rti: &RoutingTableInner, entry: Option<Arc<BucketEntry>>| -> bool {
// Exclude our own node from routes // Exclude our own node from routes
if entry.is_none() { if entry.is_none() {
return false; return false;
@ -674,14 +770,14 @@ impl RouteSpecStore {
let entry = entry.unwrap(); let entry = entry.unwrap();
// Exclude our relay if we have one // Exclude our relay if we have one
if let Some(own_relay_nr) = opt_own_relay_nr { if let Some(own_relay_nr) = &opt_own_relay_nr {
if own_relay_nr.same_bucket_entry(&entry) { if own_relay_nr.same_bucket_entry(&entry) {
return false; return false;
} }
} }
// Process node info exclusions // Process node info exclusions
let keep = entry.with(rti, |_rti, e| { let keep = entry.with_inner(|e| {
// Exclude nodes that don't have our requested crypto kinds // Exclude nodes that don't have our requested crypto kinds
let common_ck = e.common_crypto_kinds(crypto_kinds); let common_ck = e.common_crypto_kinds(crypto_kinds);
@ -712,7 +808,7 @@ impl RouteSpecStore {
return false; return false;
} }
// Exclude nodes whose relay is our own relay if we have one // Exclude nodes whose relay is our own relay if we have one
if let Some(own_relay_nr) = opt_own_relay_nr { if let Some(own_relay_nr) = &opt_own_relay_nr {
if relay_ids.contains_any(&own_relay_nr.node_ids()) { if relay_ids.contains_any(&own_relay_nr.node_ids()) {
return false; return false;
} }
@ -725,7 +821,7 @@ impl RouteSpecStore {
} }
// Exclude nodes with no publicinternet nodeinfo, or incompatible nodeinfo or node status won't route // Exclude nodes with no publicinternet nodeinfo, or incompatible nodeinfo or node status won't route
entry.with(rti, |_rti, e| { entry.with_inner(|e| {
let node_info_ok = let node_info_ok =
if let Some(sni) = e.signed_node_info(RoutingDomain::PublicInternet) { if let Some(sni) = e.signed_node_info(RoutingDomain::PublicInternet) {
sni.has_sequencing_matched_dial_info(sequencing) sni.has_sequencing_matched_dial_info(sequencing)
@ -749,41 +845,23 @@ impl RouteSpecStore {
entry2: &Option<Arc<BucketEntry>>| entry2: &Option<Arc<BucketEntry>>|
-> Ordering { -> Ordering {
// Our own node is filtered out
// xxx also sort my most overlapping crypto kinds let entry1 = entry1.unwrap();
let entry2 = entry2.unwrap();
let entry1_node_ids = entry1.with_inner(|e| e.node_ids());
let entry2_node_ids = entry2.with_inner(|e| e.node_ids());
// deprioritize nodes that we have already used as end points // deprioritize nodes that we have already used as end points
let e1_used_end = inner let e1_used_end = inner.cache.get_used_end_node_count(&entry1_node_ids);
.cache let e2_used_end = inner.cache.get_used_end_node_count(&entry2_node_ids);
.used_end_nodes
.get(&v1.0)
.cloned()
.unwrap_or_default();
let e2_used_end = inner
.cache
.used_end_nodes
.get(&v2.0)
.cloned()
.unwrap_or_default();
let cmp_used_end = e1_used_end.cmp(&e2_used_end); let cmp_used_end = e1_used_end.cmp(&e2_used_end);
if !matches!(cmp_used_end, Ordering::Equal) { if !matches!(cmp_used_end, Ordering::Equal) {
return cmp_used_end; return cmp_used_end;
} }
// deprioritize nodes we have used already anywhere // deprioritize nodes we have used already anywhere
let e1_used = inner let e1_used = inner.cache.get_used_node_count(&entry1_node_ids);
.cache let e2_used = inner.cache.get_used_node_count(&entry2_node_ids);
.used_nodes
.get(&v1.0)
.cloned()
.unwrap_or_default();
let e2_used = inner
.cache
.used_nodes
.get(&v2.0)
.cloned()
.unwrap_or_default();
let cmp_used = e1_used.cmp(&e2_used); let cmp_used = e1_used.cmp(&e2_used);
if !matches!(cmp_used, Ordering::Equal) { if !matches!(cmp_used, Ordering::Equal) {
return cmp_used; return cmp_used;
@ -793,10 +871,8 @@ impl RouteSpecStore {
// ensureordered will be taken care of by filter // ensureordered will be taken care of by filter
// and nopreference doesn't care // and nopreference doesn't care
if matches!(sequencing, Sequencing::PreferOrdered) { if matches!(sequencing, Sequencing::PreferOrdered) {
let cmp_seq = v1.1.as_ref().unwrap().with(rti, |rti, e1| { let cmp_seq = entry1.with_inner(|e1| {
v2.1.as_ref() entry2.with_inner(|e2| {
.unwrap()
.with(rti, |_rti, e2| {
let e1_can_do_ordered = e1.signed_node_info(RoutingDomain::PublicInternet).map(|sni| sni.has_sequencing_matched_dial_info(sequencing)).unwrap_or(false); let e1_can_do_ordered = e1.signed_node_info(RoutingDomain::PublicInternet).map(|sni| sni.has_sequencing_matched_dial_info(sequencing)).unwrap_or(false);
let e2_can_do_ordered = e2.signed_node_info(RoutingDomain::PublicInternet).map(|sni| sni.has_sequencing_matched_dial_info(sequencing)).unwrap_or(false); let e2_can_do_ordered = e2.signed_node_info(RoutingDomain::PublicInternet).map(|sni| sni.has_sequencing_matched_dial_info(sequencing)).unwrap_or(false);
e2_can_do_ordered.cmp(&e1_can_do_ordered) e2_can_do_ordered.cmp(&e1_can_do_ordered)
@ -808,10 +884,8 @@ impl RouteSpecStore {
} }
// always prioritize reliable nodes, but sort by oldest or fastest // always prioritize reliable nodes, but sort by oldest or fastest
let cmpout = v1.1.as_ref().unwrap().with(rti, |rti, e1| { let cmpout = entry1.with_inner(|e1| {
v2.1.as_ref() entry2.with_inner(|e2| match stability {
.unwrap()
.with(rti, |_rti, e2| match stability {
Stability::LowLatency => { Stability::LowLatency => {
BucketEntryInner::cmp_fastest_reliable(cur_ts, e1, e2) BucketEntryInner::cmp_fastest_reliable(cur_ts, e1, e2)
} }
@ -822,23 +896,16 @@ impl RouteSpecStore {
}); });
cmpout cmpout
}; };
let routing_table = self.unlocked_inner.routing_table.clone();
let transform = let transform =
|rti: &RoutingTableInner, k: PublicKey, v: Option<Arc<BucketEntry>>| -> PeerInfo { |rti: &RoutingTableInner, entry: Option<Arc<BucketEntry>>| -> NodeRef {
// Return the peerinfo for that key NodeRef::new(routing_table.clone(), entry.unwrap(), None)
v.unwrap().with(rti, |_rti, e| {
e.make_peer_info(k, RoutingDomain::PublicInternet.into())
.unwrap()
.clone()
})
}; };
// Pull the whole routing table in sorted order // Pull the whole routing table in sorted order
let node_count = rti.get_entry_count(
RoutingDomain::PublicInternet.into(),
BucketEntryState::Unreliable,
);
let nodes = let nodes =
rti.find_peers_with_sort_and_filter(node_count, cur_ts, filters, compare, transform); rti.find_peers_with_sort_and_filter(usize::MAX, cur_ts, filters, compare, transform);
// If we couldn't find enough nodes, wait until we have more nodes in the routing table // If we couldn't find enough nodes, wait until we have more nodes in the routing table
if nodes.len() < hop_count { if nodes.len() < hop_count {
@ -846,10 +913,13 @@ impl RouteSpecStore {
return Ok(None); return Ok(None);
} }
// Get peer info for everything
let nodes_pi: Vec<PeerInfo> = nodes.iter().map(|nr| nr.locked(rti).make_peer_info(RoutingDomain::PublicInternet).unwrap()).collect();
// Now go through nodes and try to build a route we haven't seen yet // Now go through nodes and try to build a route we haven't seen yet
let perm_func = Box::new(|permutation: &[usize]| { let perm_func = Box::new(|permutation: &[usize]| {
// Get the route cache key // Get the route cache key
let cache_key = route_permutation_to_hop_cache(&nodes, permutation); let cache_key = route_permutation_to_hop_cache(rti, &nodes, permutation);
// Skip routes we have already seen // Skip routes we have already seen
if inner.cache.hop_cache.contains(&cache_key) { if inner.cache.hop_cache.contains(&cache_key) {
@ -857,15 +927,16 @@ impl RouteSpecStore {
} }
// Ensure the route doesn't contain both a node and its relay // Ensure the route doesn't contain both a node and its relay
let mut seen_nodes: HashSet<PublicKey> = HashSet::new(); let mut seen_nodes: HashSet<TypedKey> = HashSet::new();
for n in permutation { for n in permutation {
let node = nodes.get(*n).unwrap(); let node = nodes.get(*n).unwrap().locked(rti);
if !seen_nodes.insert(node.node_id.key) { if !seen_nodes.insert(node.best_node_id()) {
// Already seen this node, should not be in the route twice // Already seen this node, should not be in the route twice
return None; return None;
} }
if let Some(relay_id) = node.signed_node_info.relay_id() { if let Some(relay) = node.relay(RoutingDomain::PublicInternet) {
if !seen_nodes.insert(relay_id.key) { let relay_id = relay.locked(rti).best_node_id();
if !seen_nodes.insert(relay_id) {
// Already seen this node, should not be in the route twice // Already seen this node, should not be in the route twice
return None; return None;
} }
@ -878,7 +949,7 @@ impl RouteSpecStore {
let mut previous_node = &our_peer_info; let mut previous_node = &our_peer_info;
let mut reachable = true; let mut reachable = true;
for n in permutation { for n in permutation {
let current_node = nodes.get(*n).unwrap(); let current_node = nodes_pi.get(*n).unwrap();
let cm = rti.get_contact_method( let cm = rti.get_contact_method(
RoutingDomain::PublicInternet, RoutingDomain::PublicInternet,
previous_node, previous_node,
@ -915,7 +986,7 @@ impl RouteSpecStore {
let mut next_node = &our_peer_info; let mut next_node = &our_peer_info;
let mut reachable = true; let mut reachable = true;
for n in permutation.iter().rev() { for n in permutation.iter().rev() {
let current_node = nodes.get(*n).unwrap(); let current_node = nodes_pi.get(*n).unwrap();
let cm = rti.get_contact_method( let cm = rti.get_contact_method(
RoutingDomain::PublicInternet, RoutingDomain::PublicInternet,
next_node, next_node,
@ -970,23 +1041,26 @@ impl RouteSpecStore {
return Ok(None); return Ok(None);
} }
// Got a unique route, lets build the detail, register it, and return it // Got a unique route, lets build the details, register it, and return it
let hops: Vec<PublicKey> = route_nodes.iter().map(|v| nodes[*v].node_id.key).collect(); let hop_node_refs:Vec<NodeRef> = route_nodes
let hop_node_refs = hops
.iter() .iter()
.map(|k| { .map(|k| nodes[*k].clone())
rti.lookup_node_ref(self.unlocked_inner.routing_table.clone(), *k)
.unwrap()
})
.collect(); .collect();
let mut route_set = BTreeMap::<PublicKey, RouteSpecDetail>::new();
for crypto_kind in crypto_kinds.iter().copied() {
let vcrypto = self.unlocked_inner.routing_table.crypto().get(crypto_kind).unwrap();
let (public_key, secret_key) = vcrypto.generate_keypair();
let hops: Vec<PublicKey> = route_nodes.iter().map(|v| nodes[*v].node_ids().get(crypto_kind).unwrap().key).collect();
let (public_key, secret_key) = generate_secret(); route_set.insert(public_key, RouteSpecDetail {
crypto_kind,
let rsd = RouteSpecDetail {
secret_key, secret_key,
hops, hops,
});
}
let rssd = RouteSetSpecDetail {
route_set,
hop_node_refs, hop_node_refs,
published: false, published: false,
directions, directions,
@ -998,18 +1072,19 @@ impl RouteSpecStore {
drop(perm_func); drop(perm_func);
// Add to cache // Add to cache
Self::add_to_cache(&mut inner.cache, cache_key, &rsd); Self::add_to_cache(&mut inner.cache, cache_key, &rssd);
// Keep route in spec store // Keep route in spec store
inner.content.details.insert(public_key, rsd); let id = inner.content.add_detail(rssd);
Ok(Some(public_key)) Ok(Some(id))
} }
/// validate data using a private route's key and signature chain
#[instrument(level = "trace", skip(self, data, callback), ret)] #[instrument(level = "trace", skip(self, data, callback), ret)]
pub fn with_signature_validated_route<F,R>( pub fn with_signature_validated_route<F,R>(
&self, &self,
public_key: &PublicKey, public_key: &TypedKey,
signatures: &[Signature], signatures: &[Signature],
data: &[u8], data: &[u8],
last_hop_id: PublicKey, last_hop_id: PublicKey,
@ -1019,8 +1094,22 @@ impl RouteSpecStore {
R: fmt::Debug, R: fmt::Debug,
{ {
let inner = &*self.inner.lock(); let inner = &*self.inner.lock();
let Some(rsd) = Self::detail(inner, &public_key) else { let crypto = self.unlocked_inner.routing_table.crypto();
log_rpc!(debug "route does not exist: {:?}", public_key); let Some(vcrypto) = crypto.get(public_key.kind) else {
log_rpc!(debug "can't handle route with public key: {:?}", public_key);
return None;
};
let Some(rsid) = inner.content.get_id_by_key(&public_key.key) else {
log_rpc!(debug "route id does not exist: {:?}", public_key.key);
return None;
};
let Some(rssd) = inner.content.get_detail(&rsid) else {
log_rpc!(debug "route detail does not exist: {:?}", rsid);
return None;
};
let Some(rsd) = rssd.route_set.get(&public_key.key) else {
log_rpc!(debug "route set {:?} does not have key: {:?}", rsid, public_key.key);
return None; return None;
}; };
@ -1042,7 +1131,7 @@ impl RouteSpecStore {
} }
} else { } else {
// Verify a signature for a hop node along the route // Verify a signature for a hop node along the route
if let Err(e) = verify(hop_public_key, data, &signatures[hop_n]) { if let Err(e) = vcrypto.verify(hop_public_key, data, &signatures[hop_n]) {
log_rpc!(debug "failed to verify signature for hop {} at {} on private route {}: {}", hop_n, hop_public_key, public_key, e); log_rpc!(debug "failed to verify signature for hop {} at {} on private route {}: {}", hop_n, hop_public_key, public_key, e);
return None; return None;
} }
@ -1053,10 +1142,11 @@ impl RouteSpecStore {
} }
#[instrument(level = "trace", skip(self), ret, err)] #[instrument(level = "trace", skip(self), ret, err)]
async fn test_allocated_route(&self, key: &TypedKey) -> EyreResult<bool> { async fn test_allocated_route(&self, id: &String) -> EyreResult<bool> {
// Make loopback route to test with // Make loopback route to test with
let dest = { let dest = {
let private_route = self.assemble_private_route(key, None)?; xxx figure out how to pick best crypto for the private route
let private_route = self.assemble_private_route(id, None)?;
let inner = &mut *self.inner.lock(); let inner = &mut *self.inner.lock();
let rsd = Self::detail(inner, &key).ok_or_else(|| eyre!("route does not exist"))?; let rsd = Self::detail(inner, &key).ok_or_else(|| eyre!("route does not exist"))?;
@ -1281,7 +1371,7 @@ impl RouteSpecStore {
/// List all allocated routes /// List all allocated routes
pub fn list_allocated_routes<F, R>(&self, mut filter: F) -> Vec<R> pub fn list_allocated_routes<F, R>(&self, mut filter: F) -> Vec<R>
where where
F: FnMut(&PublicKey, &RouteSpecDetail) -> Option<R>, F: FnMut(&PublicKey, &RouteSetSpecDetail) -> Option<R>,
{ {
let inner = self.inner.lock(); let inner = self.inner.lock();
let mut out = Vec::with_capacity(inner.content.details.len()); let mut out = Vec::with_capacity(inner.content.details.len());
@ -1492,7 +1582,7 @@ impl RouteSpecStore {
// (outer hop is a RouteHopData, not a RouteHop). // (outer hop is a RouteHopData, not a RouteHop).
// Each loop mutates 'nonce', and 'blob_data' // Each loop mutates 'nonce', and 'blob_data'
let mut nonce = Crypto::get_random_nonce(); let mut nonce = Crypto::get_random_nonce();
let crypto = routing_table.network_manager().crypto(); let crypto = routing_table.crypto();
// Forward order (safety route), but inside-out // Forward order (safety route), but inside-out
for h in (1..safety_rsd.hops.len()).rev() { for h in (1..safety_rsd.hops.len()).rev() {
// Get blob to encrypt for next hop // Get blob to encrypt for next hop
@ -1780,7 +1870,7 @@ impl RouteSpecStore {
/// Import a remote private route for compilation /// Import a remote private route for compilation
#[instrument(level = "trace", skip(self, blob), ret, err)] #[instrument(level = "trace", skip(self, blob), ret, err)]
pub fn import_remote_private_route(&self, blob: Vec<u8>) -> EyreResult<TypedKeySet> { xxx continue here, maybe formalize 'private route set' as having its own non-key identifier for both remote and local routes... just a uuid map to typedkeyset? pub fn import_remote_private_route(&self, blob: Vec<u8>) -> EyreResult<TypedKeySet> {
// decode the pr blob // decode the pr blob
let private_routes = RouteSpecStore::blob_to_private_routes(blob)?; let private_routes = RouteSpecStore::blob_to_private_routes(blob)?;

View File

@ -3,7 +3,7 @@ use super::*;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct RoutedOperation { pub struct RoutedOperation {
pub sequencing: Sequencing, pub sequencing: Sequencing,
pub signatures: Vec<TypedSignature>, pub signatures: Vec<Signature>,
pub nonce: Nonce, pub nonce: Nonce,
pub data: Vec<u8>, pub data: Vec<u8>,
} }
@ -22,14 +22,14 @@ impl RoutedOperation {
reader: &veilid_capnp::routed_operation::Reader, reader: &veilid_capnp::routed_operation::Reader,
) -> Result<RoutedOperation, RPCError> { ) -> Result<RoutedOperation, RPCError> {
let sigs_reader = reader.get_signatures().map_err(RPCError::protocol)?; let sigs_reader = reader.get_signatures().map_err(RPCError::protocol)?;
let mut signatures = Vec::<TypedSignature>::with_capacity( let mut signatures = Vec::<Signature>::with_capacity(
sigs_reader sigs_reader
.len() .len()
.try_into() .try_into()
.map_err(RPCError::map_internal("too many signatures"))?, .map_err(RPCError::map_internal("too many signatures"))?,
); );
for s in sigs_reader.iter() { for s in sigs_reader.iter() {
let sig = decode_typed_signature(&s)?; let sig = decode_signature512(&s)?;
signatures.push(sig); signatures.push(sig);
} }
@ -61,7 +61,7 @@ impl RoutedOperation {
); );
for (i, sig) in self.signatures.iter().enumerate() { for (i, sig) in self.signatures.iter().enumerate() {
let mut sig_builder = sigs_builder.reborrow().get(i as u32); let mut sig_builder = sigs_builder.reborrow().get(i as u32);
encode_typed_signature(sig, &mut sig_builder); encode_signature512(sig, &mut sig_builder);
} }
let mut n_builder = builder.reborrow().init_nonce(); let mut n_builder = builder.reborrow().init_nonce();
encode_nonce(&self.nonce, &mut n_builder); encode_nonce(&self.nonce, &mut n_builder);

View File

@ -44,9 +44,6 @@ impl RPCProcessor {
&self, &self,
msg: RPCMessage, msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> { ) -> Result<NetworkResult<()>, RPCError> {
// Get the crypto kind used to send this question
let crypto_kind = msg.header.crypto_kind();
// Get the question // Get the question
let app_call_q = match msg.operation.kind() { let app_call_q = match msg.operation.kind() {
RPCOperationKind::Question(q) => match q.detail() { RPCOperationKind::Question(q) => match q.detail() {
@ -56,15 +53,20 @@ impl RPCProcessor {
_ => panic!("not a question"), _ => panic!("not a question"),
}; };
// Get the crypto kind used to send this question
let crypto_kind = msg.header.crypto_kind();
// Get the sender node id this came from
let sender = msg
.opt_sender_nr
.as_ref()
.map(|nr| nr.node_ids().get(crypto_kind).unwrap().key);
// Register a waiter for this app call // Register a waiter for this app call
let id = msg.operation.op_id(); let id = msg.operation.op_id();
let handle = self.unlocked_inner.waiting_app_call_table.add_op_waiter(id); let handle = self.unlocked_inner.waiting_app_call_table.add_op_waiter(id);
// Pass the call up through the update callback // Pass the call up through the update callback
let sender = msg
.opt_sender_nr
.as_ref()
.map(|nr| nr.node_ids().get(crypto_kind).unwrap().key);
let message = app_call_q.message.clone(); let message = app_call_q.message.clone();
(self.unlocked_inner.update_callback)(VeilidUpdate::AppCall(VeilidAppCall { (self.unlocked_inner.update_callback)(VeilidUpdate::AppCall(VeilidAppCall {
sender, sender,