private route work

This commit is contained in:
John Smith 2022-10-09 22:07:15 -04:00
parent 338dc6b39d
commit f7f166741b
7 changed files with 365 additions and 158 deletions

View File

@ -132,8 +132,11 @@ struct RouteHopData {
} }
struct RouteHop { struct RouteHop {
dialInfo @0 :NodeDialInfo; # dial info for this hop node :union {
nextHop @1 :RouteHopData; # Optional: next hop in encrypted blob nodeId @0 :NodeID; # node id only for established routes
peerInfo @1 :PeerInfo; # full peer info for this hop to establish the route
}
nextHop @2 :RouteHopData; # Optional: next hop in encrypted blob
# Null means no next hop, at destination (only used in private route, safety routes must enclose a stub private route) # Null means no next hop, at destination (only used in private route, safety routes must enclose a stub private route)
} }

View File

@ -102,48 +102,77 @@ impl TryFrom<String> for AttachmentState {
} }
pub struct AttachmentManagerInner { pub struct AttachmentManagerInner {
config: VeilidConfig,
attachment_machine: CallbackStateMachine<Attachment>, attachment_machine: CallbackStateMachine<Attachment>,
network_manager: NetworkManager,
maintain_peers: bool, maintain_peers: bool,
attach_timestamp: Option<u64>, attach_timestamp: Option<u64>,
update_callback: Option<UpdateCallback>, update_callback: Option<UpdateCallback>,
attachment_maintainer_jh: Option<MustJoinHandle<()>>, attachment_maintainer_jh: Option<MustJoinHandle<()>>,
} }
pub struct AttachmentManagerUnlockedInner {
config: VeilidConfig,
network_manager: NetworkManager,
}
#[derive(Clone)] #[derive(Clone)]
pub struct AttachmentManager { pub struct AttachmentManager {
inner: Arc<Mutex<AttachmentManagerInner>>, inner: Arc<Mutex<AttachmentManagerInner>>,
unlocked_inner: Arc<AttachmentManagerUnlockedInner>,
} }
impl AttachmentManager { impl AttachmentManager {
fn new_inner( fn new_unlocked_inner(
config: VeilidConfig, config: VeilidConfig,
protected_store: ProtectedStore,
table_store: TableStore, table_store: TableStore,
block_store: BlockStore,
crypto: Crypto, crypto: Crypto,
) -> AttachmentManagerInner { ) -> AttachmentManagerUnlockedInner {
AttachmentManagerInner { AttachmentManagerUnlockedInner {
config: config.clone(), config: config.clone(),
network_manager: NetworkManager::new(
config,
protected_store,
table_store,
block_store,
crypto,
),
}
}
fn new_inner() -> AttachmentManagerInner {
AttachmentManagerInner {
attachment_machine: CallbackStateMachine::new(), attachment_machine: CallbackStateMachine::new(),
network_manager: NetworkManager::new(config, table_store, crypto),
maintain_peers: false, maintain_peers: false,
attach_timestamp: None, attach_timestamp: None,
update_callback: None, update_callback: None,
attachment_maintainer_jh: None, attachment_maintainer_jh: None,
} }
} }
pub fn new(config: VeilidConfig, table_store: TableStore, crypto: Crypto) -> Self { pub fn new(
config: VeilidConfig,
protected_store: ProtectedStore,
table_store: TableStore,
block_store: BlockStore,
crypto: Crypto,
) -> Self {
Self { Self {
inner: Arc::new(Mutex::new(Self::new_inner(config, table_store, crypto))), inner: Arc::new(Mutex::new(Self::new_inner())),
unlocked_inner: Arc::new(Self::new_unlocked_inner(
config,
protected_store,
table_store,
block_store,
crypto,
)),
} }
} }
pub fn config(&self) -> VeilidConfig { pub fn config(&self) -> VeilidConfig {
self.inner.lock().config.clone() self.unlocked_inner.config.clone()
} }
pub fn network_manager(&self) -> NetworkManager { pub fn network_manager(&self) -> NetworkManager {
self.inner.lock().network_manager.clone() self.unlocked_inner.network_manager.clone()
} }
pub fn is_attached(&self) -> bool { pub fn is_attached(&self) -> bool {
@ -202,9 +231,10 @@ impl AttachmentManager {
AttachmentManager::translate_attachment_state(&inner.attachment_machine.state()); AttachmentManager::translate_attachment_state(&inner.attachment_machine.state());
// get reliable peer count from routing table // get reliable peer count from routing table
let routing_table = inner.network_manager.routing_table(); let routing_table = self.network_manager().routing_table();
let health = routing_table.get_routing_table_health(); let health = routing_table.get_routing_table_health();
let routing_table_config = &inner.config.get().network.routing_table; let config = self.config();
let routing_table_config = &config.get().network.routing_table;
let new_peer_state_input = let new_peer_state_input =
AttachmentManager::translate_routing_table_health(health, routing_table_config); AttachmentManager::translate_routing_table_health(health, routing_table_config);
@ -223,11 +253,8 @@ impl AttachmentManager {
#[instrument(level = "debug", skip(self))] #[instrument(level = "debug", skip(self))]
async fn attachment_maintainer(self) { async fn attachment_maintainer(self) {
debug!("attachment starting"); debug!("attachment starting");
let netman = { self.inner.lock().attach_timestamp = Some(intf::get_timestamp());
let mut inner = self.inner.lock(); let netman = self.network_manager();
inner.attach_timestamp = Some(intf::get_timestamp());
inner.network_manager.clone()
};
let mut restart; let mut restart;
loop { loop {
@ -286,7 +313,7 @@ impl AttachmentManager {
#[instrument(level = "debug", skip_all, err)] #[instrument(level = "debug", skip_all, err)]
pub async fn init(&self, update_callback: UpdateCallback) -> EyreResult<()> { pub async fn init(&self, update_callback: UpdateCallback) -> EyreResult<()> {
trace!("init"); trace!("init");
let network_manager = { {
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
inner.update_callback = Some(update_callback.clone()); inner.update_callback = Some(update_callback.clone());
let update_callback2 = update_callback.clone(); let update_callback2 = update_callback.clone();
@ -297,10 +324,9 @@ impl AttachmentManager {
})) }))
}, },
)); ));
inner.network_manager.clone()
}; };
network_manager.init(update_callback).await?; self.network_manager().init(update_callback).await?;
Ok(()) Ok(())
} }
@ -309,30 +335,33 @@ impl AttachmentManager {
pub async fn terminate(&self) { pub async fn terminate(&self) {
// Ensure we detached // Ensure we detached
self.detach().await; self.detach().await;
let network_manager = { self.network_manager().terminate().await;
let inner = self.inner.lock(); self.inner.lock().update_callback = None;
inner.network_manager.clone()
};
network_manager.terminate().await;
let mut inner = self.inner.lock();
inner.update_callback = None;
} }
#[instrument(level = "trace", skip(self))] #[instrument(level = "trace", skip(self))]
fn attach(&self) { fn attach(&self) {
// Create long-running connection maintenance routine // Create long-running connection maintenance routine
let this = self.clone(); let inner = self.inner.lock();
self.inner.lock().maintain_peers = true; if inner.attachment_maintainer_jh.is_some() {
self.inner.lock().attachment_maintainer_jh = return;
Some(intf::spawn(this.attachment_maintainer())); }
inner.maintain_peers = true;
inner.attachment_maintainer_jh = Some(intf::spawn(self.clone().attachment_maintainer()));
} }
#[instrument(level = "trace", skip(self))] #[instrument(level = "trace", skip(self))]
async fn detach(&self) { async fn detach(&self) {
let attachment_maintainer_jh = self.inner.lock().attachment_maintainer_jh.take(); let attachment_maintainer_jh = {
if let Some(jh) = attachment_maintainer_jh { let mut inner = self.inner.lock();
let attachment_maintainer_jh = inner.attachment_maintainer_jh.take();
if attachment_maintainer_jh.is_some() {
// Terminate long-running connection maintenance routine // Terminate long-running connection maintenance routine
self.inner.lock().maintain_peers = false; inner.maintain_peers = false;
}
attachment_maintainer_jh
};
if let Some(jh) = attachment_maintainer_jh {
jh.await; jh.await;
} }
} }

View File

@ -103,7 +103,7 @@ impl ServicesContext {
// Set up attachment manager // Set up attachment manager
trace!("init attachment manager"); trace!("init attachment manager");
let update_callback = self.update_callback.clone(); let update_callback = self.update_callback.clone();
let attachment_manager = AttachmentManager::new(self.config.clone(), table_store, crypto); let attachment_manager = AttachmentManager::new(self.config.clone(), protected_store, table_store, block_store, crypto);
if let Err(e) = attachment_manager.init(update_callback).await { if let Err(e) = attachment_manager.init(update_callback).await {
self.shutdown().await; self.shutdown().await;
return Err(e); return Err(e);

View File

@ -152,6 +152,12 @@ struct NetworkManagerInner {
} }
struct NetworkManagerUnlockedInner { struct NetworkManagerUnlockedInner {
// Handles
config: VeilidConfig,
protected_store: ProtectedStore,
table_store: TableStore,
block_store: BlockStore,
crypto: Crypto,
// Accessors // Accessors
routing_table: RwLock<Option<RoutingTable>>, routing_table: RwLock<Option<RoutingTable>>,
components: RwLock<Option<NetworkComponents>>, components: RwLock<Option<NetworkComponents>>,
@ -169,9 +175,6 @@ struct NetworkManagerUnlockedInner {
#[derive(Clone)] #[derive(Clone)]
pub struct NetworkManager { pub struct NetworkManager {
config: VeilidConfig,
table_store: TableStore,
crypto: Crypto,
inner: Arc<Mutex<NetworkManagerInner>>, inner: Arc<Mutex<NetworkManagerInner>>,
unlocked_inner: Arc<NetworkManagerUnlockedInner>, unlocked_inner: Arc<NetworkManagerUnlockedInner>,
} }
@ -185,9 +188,20 @@ impl NetworkManager {
public_address_inconsistencies_table: BTreeMap::new(), public_address_inconsistencies_table: BTreeMap::new(),
} }
} }
fn new_unlocked_inner(config: VeilidConfig) -> NetworkManagerUnlockedInner { fn new_unlocked_inner(
config: VeilidConfig,
protected_store: ProtectedStore,
table_store: TableStore,
block_store: BlockStore,
crypto: Crypto,
) -> NetworkManagerUnlockedInner {
let c = config.get(); let c = config.get();
NetworkManagerUnlockedInner { NetworkManagerUnlockedInner {
config,
protected_store,
table_store,
block_store,
crypto,
routing_table: RwLock::new(None), routing_table: RwLock::new(None),
components: RwLock::new(None), components: RwLock::new(None),
update_callback: RwLock::new(None), update_callback: RwLock::new(None),
@ -202,13 +216,22 @@ impl NetworkManager {
} }
} }
pub fn new(config: VeilidConfig, table_store: TableStore, crypto: Crypto) -> Self { pub fn new(
config: VeilidConfig,
protected_store: ProtectedStore,
table_store: TableStore,
block_store: BlockStore,
crypto: Crypto,
) -> Self {
let this = Self { let this = Self {
config: config.clone(),
table_store,
crypto,
inner: Arc::new(Mutex::new(Self::new_inner())), inner: Arc::new(Mutex::new(Self::new_inner())),
unlocked_inner: Arc::new(Self::new_unlocked_inner(config)), unlocked_inner: Arc::new(Self::new_unlocked_inner(
config,
protected_store,
table_store,
block_store,
crypto,
)),
}; };
// Set rolling transfers tick task // Set rolling transfers tick task
{ {
@ -323,13 +346,25 @@ impl NetworkManager {
this this
} }
pub fn config(&self) -> VeilidConfig { pub fn config(&self) -> VeilidConfig {
self.config.clone() self.unlocked_inner.config.clone()
}
pub fn with_config<F, R>(&self, f: F) -> R
where
F: FnOnce(&VeilidConfigInner) -> R,
{
f(&*self.unlocked_inner.config.get())
}
pub fn protected_store(&self) -> ProtectedStore {
self.unlocked_inner.protected_store.clone()
} }
pub fn table_store(&self) -> TableStore { pub fn table_store(&self) -> TableStore {
self.table_store.clone() self.unlocked_inner.table_store.clone()
}
pub fn block_store(&self) -> BlockStore {
self.unlocked_inner.block_store.clone()
} }
pub fn crypto(&self) -> Crypto { pub fn crypto(&self) -> Crypto {
self.crypto.clone() self.unlocked_inner.crypto.clone()
} }
pub fn routing_table(&self) -> RoutingTable { pub fn routing_table(&self) -> RoutingTable {
self.unlocked_inner self.unlocked_inner
@ -540,7 +575,7 @@ impl NetworkManager {
} }
pub fn purge_client_whitelist(&self) { pub fn purge_client_whitelist(&self) {
let timeout_ms = self.config.get().network.client_whitelist_timeout_ms; let timeout_ms = self.with_config(|c| c.network.client_whitelist_timeout_ms);
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
let cutoff_timestamp = intf::get_timestamp() - ((timeout_ms as u64) * 1000u64); let cutoff_timestamp = intf::get_timestamp() - ((timeout_ms as u64) * 1000u64);
// Remove clients from the whitelist that haven't been since since our whitelist timeout // Remove clients from the whitelist that haven't been since since our whitelist timeout
@ -576,10 +611,7 @@ impl NetworkManager {
RoutingDomain::PublicInternet.into(), RoutingDomain::PublicInternet.into(),
BucketEntryState::Unreliable, BucketEntryState::Unreliable,
); );
let min_peer_count = { let min_peer_count = self.with_config(|c| c.network.dht.min_peer_count as usize);
let c = self.config.get();
c.network.dht.min_peer_count as usize
};
// If none, then add the bootstrap nodes to it // If none, then add the bootstrap nodes to it
if live_public_internet_entry_count == 0 { if live_public_internet_entry_count == 0 {
@ -857,7 +889,7 @@ impl NetworkManager {
// Encode envelope // Encode envelope
let envelope = Envelope::new(version, ts, nonce, node_id, dest_node_id); let envelope = Envelope::new(version, ts, nonce, node_id, dest_node_id);
envelope envelope
.to_encrypted_data(self.crypto.clone(), body.as_ref(), &node_id_secret) .to_encrypted_data(self.crypto(), body.as_ref(), &node_id_secret)
.wrap_err("envelope failed to encode") .wrap_err("envelope failed to encode")
} }
@ -1095,6 +1127,11 @@ impl NetworkManager {
} }
} }
/// Get the contact method required for node A to reach node B
pub fn get_node_contact_method(node_a: &NodeInfo, node_b: &NodeInfo) -> ContactMethod {
unimplemented!();
}
// Send a reverse connection signal and wait for the return receipt over it // Send a reverse connection signal and wait for the return receipt over it
// Then send the data across the new connection // Then send the data across the new connection
// Only usable for PublicInternet routing domain // Only usable for PublicInternet routing domain
@ -1106,8 +1143,13 @@ impl NetworkManager {
data: Vec<u8>, data: Vec<u8>,
) -> EyreResult<NetworkResult<ConnectionDescriptor>> { ) -> EyreResult<NetworkResult<ConnectionDescriptor>> {
// Build a return receipt for the signal // Build a return receipt for the signal
let receipt_timeout = let receipt_timeout = ms_to_us(
ms_to_us(self.config.get().network.reverse_connection_receipt_time_ms); self.unlocked_inner
.config
.get()
.network
.reverse_connection_receipt_time_ms,
);
let (receipt, eventual_value) = self.generate_single_shot_receipt(receipt_timeout, [])?; let (receipt, eventual_value) = self.generate_single_shot_receipt(receipt_timeout, [])?;
// Get our peer info // Get our peer info
@ -1188,7 +1230,13 @@ impl NetworkManager {
.unwrap_or_default()); .unwrap_or_default());
// Build a return receipt for the signal // Build a return receipt for the signal
let receipt_timeout = ms_to_us(self.config.get().network.hole_punch_receipt_time_ms); let receipt_timeout = ms_to_us(
self.unlocked_inner
.config
.get()
.network
.hole_punch_receipt_time_ms,
);
let (receipt, eventual_value) = self.generate_single_shot_receipt(receipt_timeout, [])?; let (receipt, eventual_value) = self.generate_single_shot_receipt(receipt_timeout, [])?;
// Get our peer info // Get our peer info
let peer_info = self let peer_info = self
@ -1404,10 +1452,7 @@ impl NetworkManager {
// Direct bootstrap request // Direct bootstrap request
#[instrument(level = "trace", err, skip(self))] #[instrument(level = "trace", err, skip(self))]
pub async fn boot_request(&self, dial_info: DialInfo) -> EyreResult<Vec<PeerInfo>> { pub async fn boot_request(&self, dial_info: DialInfo) -> EyreResult<Vec<PeerInfo>> {
let timeout_ms = { let timeout_ms = self.with_config(|c| c.network.rpc.timeout_ms);
let c = self.config.get();
c.network.rpc.timeout_ms
};
// Send boot magic to requested peer address // Send boot magic to requested peer address
let data = BOOT_MAGIC.to_vec(); let data = BOOT_MAGIC.to_vec();
let out_data: Vec<u8> = network_result_value_or_log!(debug self let out_data: Vec<u8> = network_result_value_or_log!(debug self
@ -1502,13 +1547,12 @@ impl NetworkManager {
}; };
// Get timestamp range // Get timestamp range
let (tsbehind, tsahead) = { let (tsbehind, tsahead) = self.with_config(|c| {
let c = self.config.get();
( (
c.network.rpc.max_timestamp_behind_ms.map(ms_to_us), c.network.rpc.max_timestamp_behind_ms.map(ms_to_us),
c.network.rpc.max_timestamp_ahead_ms.map(ms_to_us), c.network.rpc.max_timestamp_ahead_ms.map(ms_to_us),
) )
}; });
// Validate timestamp isn't too old // Validate timestamp isn't too old
let ts = intf::get_timestamp(); let ts = intf::get_timestamp();
@ -1742,11 +1786,14 @@ impl NetworkManager {
} }
let routing_table = self.routing_table(); let routing_table = self.routing_table();
let c = self.config.get(); let (detect_address_changes, ip6_prefix_size) = self.with_config(|c| {
let detect_address_changes = c.network.detect_address_changes; (
c.network.detect_address_changes,
c.network.max_connections_per_ip6_prefix_size as usize,
)
});
// Get the ip(block) this report is coming from // Get the ip(block) this report is coming from
let ip6_prefix_size = c.network.max_connections_per_ip6_prefix_size as usize;
let ipblock = ip_to_ipblock( let ipblock = ip_to_ipblock(
ip6_prefix_size, ip6_prefix_size,
connection_descriptor.remote_address().to_ip_addr(), connection_descriptor.remote_address().to_ip_addr(),

View File

@ -4,9 +4,14 @@ use serde::*;
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
struct RouteSpecDetail { struct RouteSpecDetail {
/// The actual route spec /// Secret key
#[serde(with = "arc_serialize")] #[serde(skip)]
route_spec: Arc<RouteSpec>, secret_key: DHTKeySecret,
/// Route hops
hops: Vec<DHTKey>,
/// Route noderefs
#[serde(skip)]
hop_node_refs: Vec<NodeRef>,
/// Transfers up and down /// Transfers up and down
transfer_stats_down_up: TransferStatsDownUp, transfer_stats_down_up: TransferStatsDownUp,
/// Latency stats /// Latency stats
@ -18,10 +23,15 @@ struct RouteSpecDetail {
#[serde(skip)] #[serde(skip)]
transfer_stats_accounting: TransferStatsAccounting, transfer_stats_accounting: TransferStatsAccounting,
/// Published private route, do not reuse for ephemeral routes /// Published private route, do not reuse for ephemeral routes
/// Not serialized because all routes should be re-published when restarting
#[serde(skip)] #[serde(skip)]
published: bool, published: bool,
/// Timestamp of when the route was created /// Timestamp of when the route was created
timestamp: u64, created_ts: u64,
/// Timestamp of when the route was last checked for validity
last_checked_ts: Option<u64>,
/// Directions this route is guaranteed to work in
directions: DirectionSet,
} }
/// The core representation of the RouteSpecStore that can be serialized /// The core representation of the RouteSpecStore that can be serialized
@ -34,10 +44,6 @@ pub struct RouteSpecStoreContent {
/// Ephemeral data used to help the RouteSpecStore operate efficiently /// Ephemeral data used to help the RouteSpecStore operate efficiently
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub struct RouteSpecStoreCache { pub struct RouteSpecStoreCache {
/// The fastest routes by latency
fastest_routes: Vec<DHTKey>,
/// The most reliable routes by node lifetime longevity
reliable_routes: Vec<DHTKey>,
/// How many times nodes have been used /// How many times nodes have been used
used_nodes: HashMap<DHTKey, usize>, used_nodes: HashMap<DHTKey, usize>,
/// How many times nodes have been used at the terminal point of a route /// How many times nodes have been used at the terminal point of a route
@ -62,14 +68,76 @@ fn route_spec_to_hop_cache(spec: Arc<RouteSpec>) -> Vec<u8> {
cache cache
} }
fn node_sublist_to_hop_cache( /// number of route permutations is the number of unique orderings
nodes: &[(DHTKey, Arc<BucketEntry>)], /// for a set of nodes, given that the first node is fixed
start: usize, fn get_route_permutation_count(hop_count: usize) -> usize {
len: usize, if hop_count == 0 {
) -> Vec<u8> { unreachable!();
let mut cache: Vec<u8> = Vec::with_capacity(len * DHT_KEY_LENGTH); }
for node in &nodes[start..start + len] { // a single node or two nodes is always fixed
cache.extend_from_slice(&node.0.bytes) if hop_count == 1 || hop_count == 2 {
return 1;
}
// more than two nodes has factorial permutation
// hop_count = 3 -> 2! -> 2
// hop_count = 4 -> 3! -> 6
(3..hop_count).into_iter().fold(2usize, |acc, x| acc * x)
}
/// get the route permutation at particular 'perm' index, starting at the 'start' index
/// for a set of 'hop_count' nodes. the first node is always fixed, and the maximum
/// number of permutations is given by get_route_permutation_count()
fn with_route_permutations<F>(hop_count: usize, start: usize, f: F) -> bool
where
F: FnMut(&[usize]) -> bool,
{
if hop_count == 0 {
unreachable!();
}
// initial permutation
let mut permutation: Vec<usize> = Vec::with_capacity(hop_count);
for n in 0..hop_count {
permutation[n] = start + n;
}
// if we have one hop or two, then there's only one permutation
if hop_count == 1 || hop_count == 2 {
return f(&permutation);
}
// heaps algorithm
fn heaps_permutation<F>(permutation: &mut [usize], size: usize, f: F) -> bool
where
F: FnMut(&[usize]) -> bool,
{
if size == 1 {
if f(&permutation) {
return true;
}
return false;
}
for i in 0..size {
if heaps_permutation(permutation, size - 1, f) {
return true;
}
if size % 2 == 1 {
permutation.swap(1, size);
} else {
permutation.swap(1 + i, size);
}
}
false
}
// recurse
heaps_permutation(&mut permutation, hop_count - 1, f)
}
/// get the hop cache key for a particular route permutation
fn route_permutation_to_hop_cache(nodes: &[(DHTKey, NodeInfo)], perm: &[usize]) -> Vec<u8> {
let mut cache: Vec<u8> = Vec::with_capacity(perm.len() * DHT_KEY_LENGTH);
for n in perm {
cache.extend_from_slice(&nodes[*n].0.bytes)
} }
cache cache
} }
@ -84,30 +152,33 @@ impl RouteSpecStore {
} }
} }
pub fn from_cbor( pub fn load(routing_table: RoutingTable) -> Result<RouteSpecStore, VeilidAPIError> {
routing_table: RoutingTable, // Get cbor blob from table store
cbor: &[u8],
) -> Result<RouteSpecStore, VeilidAPIError> {
let content: RouteSpecStoreContent = serde_cbor::from_slice(cbor) let content: RouteSpecStoreContent = serde_cbor::from_slice(cbor)
.map_err(|e| VeilidAPIError::parse_error("invalid route spec store content", e))?; .map_err(|e| VeilidAPIError::parse_error("invalid route spec store content", e))?;
let rss = RouteSpecStore { let rss = RouteSpecStore {
content, content,
cache: Default::default(), cache: Default::default(),
}; };
rss.rebuild_cache(); rss.rebuild_cache(routing_table);
Ok(rss) Ok(rss)
} }
pub fn to_cbor(&self) -> Vec<u8> { pub fn save(&self, routing_table: RoutingTable) -> Result<(), VeilidAPIError> {
serde_cbor::to_vec(&self.content).unwrap() // Save all the fields we care about to the cbor blob in table storage
let cbor = serde_cbor::to_vec(&self.content).unwrap();
let table_store = routing_table.network_manager().table_store();
table_store.open("")
} }
fn rebuild_cache(&mut self) { fn rebuild_cache(&mut self, routing_table: RoutingTable) {
// //
// xxx also load secrets from pstore
let pstore = routing_table.network_manager().protected_store();
} }
fn detail_mut(&mut self, spec: Arc<RouteSpec>) -> &mut RouteSpecDetail { fn detail_mut(&mut self, public_key: DHTKey) -> &mut RouteSpecDetail {
self.content.details.get_mut(&spec.public_key).unwrap() self.content.details.get_mut(&public_key).unwrap()
} }
/// Create a new route /// Create a new route
@ -119,7 +190,8 @@ impl RouteSpecStore {
routing_table: RoutingTable, routing_table: RoutingTable,
reliable: bool, reliable: bool,
hop_count: usize, hop_count: usize,
) -> Option<Arc<RouteSpec>> { directions: DirectionSet,
) -> Option<DHTKey> {
use core::cmp::Ordering; use core::cmp::Ordering;
let max_route_hop_count = { let max_route_hop_count = {
@ -257,51 +329,101 @@ impl RouteSpecStore {
} }
// Now go through nodes and try to build a route we haven't seen yet // Now go through nodes and try to build a route we haven't seen yet
let mut route_nodes = None; let mut route_nodes: Vec<usize> = Vec::with_capacity(hop_count);
for start in 0..(nodes.len() - hop_count) { for start in 0..(nodes.len() - hop_count) {
// Try the permutations available starting with 'start'
let done = with_route_permutations(hop_count, start, |permutation: &[usize]| {
// Get the route cache key // Get the route cache key
let key = node_sublist_to_hop_cache(&nodes, start, hop_count); let key = route_permutation_to_hop_cache(&nodes, permutation);
// try each route until we find a unique one // Skip routes we have already seen
if !self.cache.hop_cache.contains(&key) { if self.cache.hop_cache.contains(&key) {
route_nodes = Some(&nodes[start..start + hop_count]); return false;
}
// Ensure this route is viable by checking that each node can contact the next one
if directions.contains(Direction::Outbound) {
let our_node_info =
routing_table.get_own_node_info(RoutingDomain::PublicInternet);
let mut previous_node_info = &our_node_info;
let mut reachable = true;
for n in permutation {
let current_node_info = &nodes.get(*n).as_ref().unwrap().1;
let cm = NetworkManager::get_node_contact_method(
previous_node_info,
current_node_info,
);
if matches!(cm, ContactMethod::Unreachable) {
reachable = false;
break;
}
previous_node_info = current_node_info;
}
if !reachable {
return false;
}
}
if directions.contains(Direction::Inbound) {
let our_node_info =
routing_table.get_own_node_info(RoutingDomain::PublicInternet);
let mut next_node_info = &our_node_info;
let mut reachable = true;
for n in permutation.iter().rev() {
let current_node_info = &nodes.get(*n).as_ref().unwrap().1;
let cm = NetworkManager::get_node_contact_method(
current_node_info,
next_node_info,
);
if matches!(cm, ContactMethod::Unreachable) {
reachable = false;
break;
}
next_node_info = current_node_info;
}
if !reachable {
return false;
}
}
// Keep this route
route_nodes = permutation.to_vec();
true
});
if done {
break; break;
} }
} }
if route_nodes.is_none() { if route_nodes.is_empty() {
return None; return None;
} }
let route_node = route_nodes.unwrap();
// Got a unique route, lets build the detail, register it, and return it // Got a unique route, lets build the detail, register it, and return it
let hops: Vec<RouteHopSpec> = route_node let hops = route_nodes.iter().map(|v| nodes[*v].0).collect();
.into_iter() let hop_node_refs = route_nodes
.map(|v| RouteHopSpec { .iter()
dial_info: NodeDialInfo { .map(|v| routing_table.lookup_node_ref(nodes[*v].0).unwrap())
node_id: NodeId::new(v.0),
dial_info: xxx,
},
})
.collect(); .collect();
let (public_key, secret_key) = generate_secret(); let (public_key, secret_key) = generate_secret();
let route_spec = Arc::new(RouteSpec {
public_key,
secret_key,
hops,
});
let rsd = RouteSpecDetail { let rsd = RouteSpecDetail {
route_spec, secret_key,
hops,
hop_node_refs,
transfer_stats_down_up: Default::default(), transfer_stats_down_up: Default::default(),
latency_stats: Default::default(), latency_stats: Default::default(),
latency_stats_accounting: Default::default(), latency_stats_accounting: Default::default(),
transfer_stats_accounting: Default::default(), transfer_stats_accounting: Default::default(),
published: false, published: false,
timestamp: cur_ts, created_ts: cur_ts,
last_checked_ts: None,
directions,
}; };
None self.content.details.insert(public_key, rsd);
// xxx insert into cache too
Some(public_key)
} }
pub fn release_route(&mut self, spec: Arc<RouteSpec>) {} pub fn release_route(&mut self, spec: Arc<RouteSpec>) {}
@ -311,15 +433,22 @@ impl RouteSpecStore {
/// Mark route as published /// Mark route as published
/// When first deserialized, routes must be re-published in order to ensure they remain /// When first deserialized, routes must be re-published in order to ensure they remain
/// in the RouteSpecStore. /// in the RouteSpecStore.
pub fn publish_route(&mut self, spec: Arc<RouteSpec>) { pub fn mark_route_published(&mut self, spec: Arc<RouteSpec>) {
//compile private route here? self.detail_mut(spec).published = true;
} }
pub fn record_latency( /// Mark route as checked
&mut self, pub fn touch_route_checked(&mut self, spec: Arc<RouteSpec>, cur_ts: u64) {
spec: Arc<RouteSpec>, self.detail_mut(spec).last_checked_ts = cur_ts;
latency: u64, }
) -> veilid_api::LatencyStats {
pub fn record_latency(&mut self, spec: Arc<RouteSpec>, latency: u64) {
let lsa = self.detail_mut(spec).latency_stats_accounting;
self.detail_mut(spec).latency_stats = lsa.record_latency(latency);
}
pub fn latency_stats(&self, spec: Arc<RouteSpec>) -> LatencyStats {
self.detail_mut(spec).latency_stats.clone()
} }
pub fn add_down(&mut self, spec: Arc<RouteSpec>, bytes: u64) { pub fn add_down(&mut self, spec: Arc<RouteSpec>, bytes: u64) {

View File

@ -645,6 +645,14 @@ impl NodeInfo {
} }
} }
#[allow(clippy::derive_hash_xor_eq)]
#[derive(Debug, PartialOrd, Ord, Hash, Serialize, Deserialize, EnumSetType)]
pub enum Direction {
Inbound,
Outbound,
}
pub type DirectionSet = EnumSet<Direction>;
#[allow(clippy::derive_hash_xor_eq)] #[allow(clippy::derive_hash_xor_eq)]
#[derive(Debug, PartialOrd, Ord, Hash, Serialize, Deserialize, EnumSetType)] #[derive(Debug, PartialOrd, Ord, Hash, Serialize, Deserialize, EnumSetType)]
// Keep member order appropriate for sorting < preference // Keep member order appropriate for sorting < preference

View File

@ -1,32 +1,5 @@
use super::*; use super::*;
/////////////////////////////////////////////////////////////////////////////////////////////////////
// Privacy Specs
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub struct RouteHopSpec {
pub dial_info: NodeDialInfo,
}
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub struct RouteSpec {
//
pub public_key: DHTKey,
pub secret_key: DHTKeySecret,
pub hops: Vec<RouteHopSpec>,
}
impl RouteSpec {
pub fn new() -> Self {
let (pk, sk) = generate_secret();
RouteSpec {
public_key: pk,
secret_key: sk,
hops: Vec::new(),
}
}
}
//////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////
// Compiled Privacy Objects // Compiled Privacy Objects
@ -36,9 +9,27 @@ pub struct RouteHopData {
pub blob: Vec<u8>, pub blob: Vec<u8>,
} }
#[derive(Clone, Debug)]
pub enum RouteNode {
NodeId(DHTKey),
PeerInfo(PeerInfo),
}
impl fmt::Display for RouteNode {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"{}",
match self {
RouteNode::NodeId(x) => x.encode(),
RouteNode::PeerInfo(pi) => pi.node_id.key.encode(),
}
)
}
}
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct RouteHop { pub struct RouteHop {
pub dial_info: NodeDialInfo, pub node: RouteNode,
pub next_hop: Option<RouteHopData>, pub next_hop: Option<RouteHopData>,
} }
@ -46,7 +37,7 @@ pub struct RouteHop {
pub struct PrivateRoute { pub struct PrivateRoute {
pub public_key: DHTKey, pub public_key: DHTKey,
pub hop_count: u8, pub hop_count: u8,
pub hops: Option<RouteHop>, pub first_hop: Option<RouteHop>,
} }
impl PrivateRoute { impl PrivateRoute {
@ -54,7 +45,7 @@ impl PrivateRoute {
Self { Self {
public_key, public_key,
hop_count: 0, hop_count: 0,
hops: None, first_hop: None,
} }
} }
} }
@ -66,8 +57,8 @@ impl fmt::Display for PrivateRoute {
"PR({:?}+{}{})", "PR({:?}+{}{})",
self.public_key, self.public_key,
self.hop_count, self.hop_count,
if let Some(hops) = &self.hops { if let Some(first_hop) = &self.first_hop {
format!("->{}", hops.dial_info) format!("->{}", first_hop.node)
} else { } else {
"".to_owned() "".to_owned()
} }