checkpoint

This commit is contained in:
John Smith 2022-11-22 18:26:39 -05:00
parent 27f7f49d4f
commit dec7bd27da
7 changed files with 280 additions and 88 deletions

View File

@ -3,6 +3,7 @@ mod bucket_entry;
mod debug; mod debug;
mod node_ref; mod node_ref;
mod node_ref_filter; mod node_ref_filter;
mod privacy;
mod route_spec_store; mod route_spec_store;
mod routing_domain_editor; mod routing_domain_editor;
mod routing_domains; mod routing_domains;
@ -21,6 +22,7 @@ pub use debug::*;
use hashlink::LruCache; use hashlink::LruCache;
pub use node_ref::*; pub use node_ref::*;
pub use node_ref_filter::*; pub use node_ref_filter::*;
pub use privacy::*;
pub use route_spec_store::*; pub use route_spec_store::*;
pub use routing_domain_editor::*; pub use routing_domain_editor::*;
pub use routing_domains::*; pub use routing_domains::*;

View File

@ -81,14 +81,14 @@ pub struct RouteSpecStoreContent {
/// What remote private routes have seen /// What remote private routes have seen
#[derive(Debug, Clone, Default)] #[derive(Debug, Clone, Default)]
struct RemotePrivateRouteInfo { struct RemotePrivateRouteInfo {
// When this remote private route was last modified // The private route itself
modified_ts: u64, private_route: Option<PrivateRoute>,
/// Did this remote private route see our node info due to no safety route in use /// Timestamp of when the route was last used for anything
seen_our_node_info: bool, last_used_ts: u64,
/// The time this remote private route last responded /// The time this remote private route last responded
last_replied_ts: Option<u64>, last_replied_ts: Option<u64>,
/// Timestamp of when the route was last used for anything /// Did this remote private route see our node info due to no safety route in use
last_used_ts: Option<u64>, seen_our_node_info: bool,
} }
/// Ephemeral data used to help the RouteSpecStore operate efficiently /// Ephemeral data used to help the RouteSpecStore operate efficiently
@ -743,7 +743,7 @@ impl RouteSpecStore {
} }
} }
} }
// We got the correct signatures, return a key ans // We got the correct signatures, return a key and response safety spec
Ok(Some(( Ok(Some((
rsd.secret_key, rsd.secret_key,
SafetySpec { SafetySpec {
@ -755,9 +755,48 @@ impl RouteSpecStore {
))) )))
} }
/// Test an allocated route for continuity
pub async fn test_route(&self, key: &DHTKey) -> EyreResult<bool> {
let inner = &mut *self.inner.lock();
let rsd = Self::detail(inner, &key).ok_or_else(|| eyre!("route does not exist"))?;
let rpc_processor = self.unlocked_inner.routing_table.rpc_processor();
// Target is the last hop
let target = rsd.hop_node_refs.last().unwrap().clone();
let hop_count = rsd.hops.len();
let stability = rsd.stability;
let sequencing = rsd.sequencing;
// Test with ping to end
let res = match rpc_processor
.rpc_call_status(Destination::Direct {
target,
safety_selection: SafetySelection::Safe(SafetySpec {
preferred_route: Some(key.clone()),
hop_count,
stability,
sequencing,
}),
})
.await?
{
NetworkResult::Value(v) => v,
_ => {
// Did not error, but did not come back, just return false
return Ok(false);
}
};
Ok(true)
}
/// Release an allocated route that is no longer in use
pub fn release_route(&self, public_key: DHTKey) -> EyreResult<()> { pub fn release_route(&self, public_key: DHTKey) -> EyreResult<()> {
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
if let Some(detail) = inner.content.details.remove(&public_key) { let Some(detail) = inner.content.details.remove(&public_key) else {
bail!("can't release route that was never allocated");
};
// Remove from hop cache // Remove from hop cache
let cache_key = route_hops_to_hop_cache(&detail.hops); let cache_key = route_hops_to_hop_cache(&detail.hops);
if !inner.cache.hop_cache.remove(&cache_key) { if !inner.cache.hop_cache.remove(&cache_key) {
@ -793,9 +832,6 @@ impl RouteSpecStore {
panic!("used_end_nodes cache should have contained hop"); panic!("used_end_nodes cache should have contained hop");
} }
} }
} else {
bail!("can't release route that was never allocated");
}
Ok(()) Ok(())
} }
@ -1205,37 +1241,66 @@ impl RouteSpecStore {
Ok(private_route) Ok(private_route)
} }
/// Import a remote private route for compilation
pub fn import_remote_private_route(&self, blob: Vec<u8>) -> EyreResult<DHTKey> {
// decode the pr blob
let private_route = RouteSpecStore::blob_to_private_route(blob)?;
// store the private route in our cache
let inner = &mut *self.inner.lock();
let cur_ts = intf::get_timestamp();
let key = Self::with_create_remote_private_route(inner, cur_ts, private_route, |r| {
r.private_route.as_ref().unwrap().public_key.clone()
});
Ok(key)
}
/// Retrieve an imported remote private route by its public key
pub fn get_remote_private_route(&self, key: &DHTKey) -> EyreResult<PrivateRoute> {
let inner = &mut *self.inner.lock();
let cur_ts = intf::get_timestamp();
let Some(pr) = Self::with_get_remote_private_route(inner, cur_ts, key, |r| {
r.private_route.as_ref().unwrap().clone()
}) else {
bail!("remote private route not found");
};
Ok(pr)
}
// get or create a remote private route cache entry // get or create a remote private route cache entry
fn with_create_remote_private_route<F, R>( fn with_create_remote_private_route<F, R>(
inner: &mut RouteSpecStoreInner, inner: &mut RouteSpecStoreInner,
cur_ts: u64, cur_ts: u64,
key: &DHTKey, private_route: PrivateRoute,
f: F, f: F,
) -> R ) -> R
where where
F: FnOnce(&mut RemotePrivateRouteInfo) -> R, F: FnOnce(&mut RemotePrivateRouteInfo) -> R,
{ {
let pr_pubkey = private_route.public_key;
let rpr = inner let rpr = inner
.cache .cache
.remote_private_route_cache .remote_private_route_cache
.entry(*key) .entry(pr_pubkey)
.and_modify(|rpr| { .and_modify(|rpr| {
if cur_ts - rpr.modified_ts >= REMOTE_PRIVATE_ROUTE_CACHE_EXPIRY { if cur_ts - rpr.last_used_ts >= REMOTE_PRIVATE_ROUTE_CACHE_EXPIRY {
*rpr = RemotePrivateRouteInfo { // Start fresh if this had expired
modified_ts: cur_ts, rpr.last_used_ts = cur_ts;
seen_our_node_info: false, rpr.last_replied_ts = None;
last_replied_ts: None, rpr.seen_our_node_info = false;
last_used_ts: None,
};
} else { } else {
rpr.modified_ts = cur_ts; // If not expired, just mark as being used
rpr.last_used_ts = cur_ts;
} }
}) })
.or_insert_with(|| RemotePrivateRouteInfo { .or_insert_with(|| RemotePrivateRouteInfo {
modified_ts: cur_ts, // New remote private route cache entry
seen_our_node_info: false, private_route: Some(private_route),
last_used_ts: cur_ts,
last_replied_ts: None, last_replied_ts: None,
last_used_ts: None, seen_our_node_info: false,
}); });
f(rpr) f(rpr)
} }
@ -1248,10 +1313,10 @@ impl RouteSpecStore {
f: F, f: F,
) -> Option<R> ) -> Option<R>
where where
F: FnOnce(&RemotePrivateRouteInfo) -> R, F: FnOnce(&mut RemotePrivateRouteInfo) -> R,
{ {
let rpr = inner.cache.remote_private_route_cache.get(key)?; let rpr = inner.cache.remote_private_route_cache.get_mut(key)?;
if cur_ts - rpr.modified_ts < REMOTE_PRIVATE_ROUTE_CACHE_EXPIRY { if cur_ts - rpr.last_used_ts < REMOTE_PRIVATE_ROUTE_CACHE_EXPIRY {
return Some(f(rpr)); return Some(f(rpr));
} }
inner.cache.remote_private_route_cache.remove(key); inner.cache.remote_private_route_cache.remove(key);
@ -1269,27 +1334,46 @@ impl RouteSpecStore {
} }
/// Mark a remote private route as having seen our node info /// Mark a remote private route as having seen our node info
pub fn mark_remote_private_route_seen_our_node_info(&self, key: &DHTKey, cur_ts: u64) { pub fn mark_remote_private_route_seen_our_node_info(
&self,
key: &DHTKey,
cur_ts: u64,
) -> EyreResult<()> {
let inner = &mut *self.inner.lock(); let inner = &mut *self.inner.lock();
Self::with_create_remote_private_route(inner, cur_ts, key, |rpr| { if Self::with_get_remote_private_route(inner, cur_ts, key, |rpr| {
rpr.seen_our_node_info = true; rpr.seen_our_node_info = true;
}) })
.is_none()
{
bail!("private route is missing from store: {}", key);
}
Ok(())
} }
/// Mark a remote private route as having replied to a question { /// Mark a remote private route as having replied to a question {
pub fn mark_remote_private_route_replied(&self, key: &DHTKey, cur_ts: u64) { pub fn mark_remote_private_route_replied(&self, key: &DHTKey, cur_ts: u64) -> EyreResult<()> {
let inner = &mut *self.inner.lock(); let inner = &mut *self.inner.lock();
Self::with_create_remote_private_route(inner, cur_ts, key, |rpr| { if Self::with_get_remote_private_route(inner, cur_ts, key, |rpr| {
rpr.last_replied_ts = Some(cur_ts); rpr.last_replied_ts = Some(cur_ts);
}) })
.is_none()
{
bail!("private route is missing from store: {}", key);
}
Ok(())
} }
/// Mark a remote private route as having beed used { /// Mark a remote private route as having beed used {
pub fn mark_remote_private_route_used(&self, key: &DHTKey, cur_ts: u64) { pub fn mark_remote_private_route_used(&self, key: &DHTKey, cur_ts: u64) -> EyreResult<()> {
let inner = &mut *self.inner.lock(); let inner = &mut *self.inner.lock();
Self::with_create_remote_private_route(inner, cur_ts, key, |rpr| { if Self::with_get_remote_private_route(inner, cur_ts, key, |rpr| {
rpr.last_used_ts = Some(cur_ts); rpr.last_used_ts = cur_ts;
}) })
.is_none()
{
bail!("private route is missing from store: {}", key);
}
Ok(())
} }
/// Clear caches when local our local node info changes /// Clear caches when local our local node info changes
@ -1306,8 +1390,11 @@ impl RouteSpecStore {
v.last_checked_ts = None; v.last_checked_ts = None;
} }
// Clean up remote private routes // Reset private route cache
inner.cache.remote_private_route_cache.clear(); for (_k, v) in &mut inner.cache.remote_private_route_cache {
v.last_replied_ts = None;
v.seen_our_node_info = false;
}
} }
/// Mark route as published /// Mark route as published

View File

@ -452,15 +452,21 @@ impl RPCProcessor {
let rss = self.routing_table.route_spec_store(); let rss = self.routing_table.route_spec_store();
// If we received a reply from a private route, mark it as such // If we received a reply from a private route, mark it as such
rss.mark_remote_private_route_replied(&private_route.public_key, recv_ts); if let Err(e) =
rss.mark_remote_private_route_replied(&private_route.public_key, recv_ts)
{
log_rpc!(error "private route missing: {}", e);
}
// If we sent to a private route without a safety route // If we sent to a private route without a safety route
// We need to mark our own node info as having been seen so we can optimize sending it // We need to mark our own node info as having been seen so we can optimize sending it
if let SafetySelection::Unsafe(_) = safety_selection { if let SafetySelection::Unsafe(_) = safety_selection {
rss.mark_remote_private_route_seen_our_node_info( if let Err(e) = rss.mark_remote_private_route_seen_our_node_info(
&private_route.public_key, &private_route.public_key,
recv_ts, recv_ts,
); ) {
log_rpc!(error "private route missing: {}", e);
}
} }
} }
} }
@ -761,7 +767,11 @@ impl RPCProcessor {
} = &dest } = &dest
{ {
let rss = self.routing_table.route_spec_store(); let rss = self.routing_table.route_spec_store();
rss.mark_remote_private_route_used(&private_route.public_key, intf::get_timestamp()); if let Err(e) =
rss.mark_remote_private_route_used(&private_route.public_key, intf::get_timestamp())
{
log_rpc!(error "private route missing: {}", e);
}
} }
// Pass back waitable reply completion // Pass back waitable reply completion

View File

@ -7,7 +7,7 @@ use routing_table::*;
#[derive(Default, Debug)] #[derive(Default, Debug)]
struct DebugCache { struct DebugCache {
imported_routes: Vec<PrivateRoute>, imported_routes: Vec<DHTKey>,
} }
static DEBUG_CACHE: Mutex<DebugCache> = Mutex::new(DebugCache { static DEBUG_CACHE: Mutex<DebugCache> = Mutex::new(DebugCache {
@ -123,10 +123,20 @@ fn get_destination(routing_table: RoutingTable) -> impl FnOnce(&str) -> Option<D
// Private route // Private route
let text = &text[1..]; let text = &text[1..];
let n = get_number(text)?; let n = get_number(text)?;
let dc = DEBUG_CACHE.lock(); let mut dc = DEBUG_CACHE.lock();
let r = dc.imported_routes.get(n)?; let pr_pubkey = dc.imported_routes.get(n)?;
let rss = routing_table.route_spec_store();
let private_route = match rss.get_remote_private_route(&pr_pubkey) {
Err(_) => {
// Remove imported route
dc.imported_routes.remove(n);
info!("removed dead imported route {}", n);
return None;
}
Ok(v) => v,
};
Some(Destination::private_route( Some(Destination::private_route(
r.clone(), private_route,
ss.unwrap_or(SafetySelection::Unsafe(Sequencing::NoPreference)), ss.unwrap_or(SafetySelection::Unsafe(Sequencing::NoPreference)),
)) ))
} else { } else {
@ -734,17 +744,24 @@ impl VeilidAPI {
let blob_dec = BASE64URL_NOPAD let blob_dec = BASE64URL_NOPAD
.decode(blob.as_bytes()) .decode(blob.as_bytes())
.map_err(VeilidAPIError::generic)?; .map_err(VeilidAPIError::generic)?;
let pr = let rss = self.routing_table()?.route_spec_store();
RouteSpecStore::blob_to_private_route(blob_dec).map_err(VeilidAPIError::generic)?; let pr_pubkey = rss
.import_remote_private_route(blob_dec)
.map_err(VeilidAPIError::generic)?;
let mut dc = DEBUG_CACHE.lock(); let mut dc = DEBUG_CACHE.lock();
let n = dc.imported_routes.len(); let n = dc.imported_routes.len();
let out = format!("Private route #{} imported: {}", n, pr.public_key); let out = format!("Private route #{} imported: {}", n, pr_pubkey);
dc.imported_routes.push(pr); dc.imported_routes.push(pr_pubkey);
return Ok(out); return Ok(out);
} }
async fn debug_route_test(&self, _args: Vec<String>) -> Result<String, VeilidAPIError> {
let out = "xxx".to_string();
return Ok(out);
}
async fn debug_route(&self, args: String) -> Result<String, VeilidAPIError> { async fn debug_route(&self, args: String) -> Result<String, VeilidAPIError> {
let args: Vec<String> = args.split_whitespace().map(|s| s.to_owned()).collect(); let args: Vec<String> = args.split_whitespace().map(|s| s.to_owned()).collect();
@ -764,6 +781,8 @@ impl VeilidAPI {
self.debug_route_list(args).await self.debug_route_list(args).await
} else if command == "import" { } else if command == "import" {
self.debug_route_import(args).await self.debug_route_import(args).await
} else if command == "test" {
self.debug_route_test(args).await
} else { } else {
Ok(">>> Unknown command\n".to_owned()) Ok(">>> Unknown command\n".to_owned())
} }
@ -791,6 +810,7 @@ impl VeilidAPI {
print <route> print <route>
list list
import <blob> import <blob>
test <route>
<destination> is: <destination> is:
* direct: <node>[+<safety>][<modifiers>] * direct: <node>[+<safety>][<modifiers>]

View File

@ -1,12 +1,10 @@
#![allow(dead_code)] #![allow(dead_code)]
mod debug; mod debug;
mod privacy;
mod routing_context; mod routing_context;
mod serialize_helpers; mod serialize_helpers;
pub use debug::*; pub use debug::*;
pub use privacy::*;
pub use routing_context::*; pub use routing_context::*;
pub use serialize_helpers::*; pub use serialize_helpers::*;
@ -25,12 +23,13 @@ pub use intf::BlockStore;
pub use intf::ProtectedStore; pub use intf::ProtectedStore;
pub use intf::TableStore; pub use intf::TableStore;
pub use network_manager::NetworkManager; pub use network_manager::NetworkManager;
pub use routing_table::{NodeRef, NodeRefBase, RoutingTable}; pub use routing_table::{NodeRef, NodeRefBase};
use core::fmt; use core::fmt;
use core_context::{api_shutdown, VeilidCoreContext}; use core_context::{api_shutdown, VeilidCoreContext};
use enumset::*; use enumset::*;
use rkyv::{Archive as RkyvArchive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize}; use rkyv::{Archive as RkyvArchive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize};
use routing_table::{RouteSpecStore, RoutingTable};
use rpc_processor::*; use rpc_processor::*;
use serde::*; use serde::*;
use xx::*; use xx::*;
@ -86,8 +85,8 @@ pub enum VeilidAPIError {
Timeout, Timeout,
#[error("Shutdown")] #[error("Shutdown")]
Shutdown, Shutdown,
#[error("Node not found: {node_id}")] #[error("Key not found: {key}")]
NodeNotFound { node_id: NodeId }, KeyNotFound { key: DHTKey },
#[error("No connection: {message}")] #[error("No connection: {message}")]
NoConnection { message: String }, NoConnection { message: String },
#[error("No peer info: {node_id}")] #[error("No peer info: {node_id}")]
@ -123,11 +122,13 @@ impl VeilidAPIError {
pub fn shutdown() -> Self { pub fn shutdown() -> Self {
Self::Shutdown Self::Shutdown
} }
pub fn node_not_found(node_id: NodeId) -> Self { pub fn key_not_found(key: DHTKey) -> Self {
Self::NodeNotFound { node_id } Self::KeyNotFound { key }
}
pub fn no_connection<T: ToString>(msg: T) -> Self {
Self::NoConnection {
message: msg.to_string(),
} }
pub fn no_connection(message: String) -> Self {
Self::NoConnection { message }
} }
pub fn no_peer_info(node_id: NodeId) -> Self { pub fn no_peer_info(node_id: NodeId) -> Self {
Self::NoPeerInfo { node_id } Self::NoPeerInfo { node_id }
@ -2681,6 +2682,13 @@ impl VeilidAPI {
} }
Err(VeilidAPIError::NotInitialized) Err(VeilidAPIError::NotInitialized)
} }
pub fn routing_table(&self) -> Result<RoutingTable, VeilidAPIError> {
let inner = self.inner.lock();
if let Some(context) = &inner.context {
return Ok(context.attachment_manager.network_manager().routing_table());
}
Err(VeilidAPIError::NotInitialized)
}
//////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////
// Attach/Detach // Attach/Detach
@ -2732,6 +2740,64 @@ impl VeilidAPI {
RoutingContext::new(self.clone()) RoutingContext::new(self.clone())
} }
////////////////////////////////////////////////////////////////
// Private route allocation
#[instrument(level = "debug", skip(self))]
pub async fn new_default_private_route(&self) -> Result<(DHTKey, Vec<u8>), VeilidAPIError> {
let config = self.config()?;
let c = config.get();
self.new_private_route(
Stability::LowLatency,
Sequencing::NoPreference,
c.network.rpc.default_route_hop_count.into(),
)
.await
}
#[instrument(level = "debug", skip(self))]
pub async fn new_private_route(
&self,
stability: Stability,
sequencing: Sequencing,
hop_count: usize,
) -> Result<(DHTKey, Vec<u8>), VeilidAPIError> {
let rss = self.routing_table()?.route_spec_store();
let r = rss
.allocate_route(
stability,
sequencing,
hop_count,
Direction::Inbound.into(),
&[],
)
.map_err(VeilidAPIError::internal)?;
let Some(pr_pubkey) = r else {
return Err(VeilidAPIError::generic("unable to allocate route"));
};
if !rss
.test_route(&pr_pubkey)
.await
.map_err(VeilidAPIError::no_connection)?
{
rss.release_route(pr_pubkey)
.map_err(VeilidAPIError::generic)?;
return Err(VeilidAPIError::generic("allocated route failed to test"));
}
let private_route = rss
.assemble_private_route(&pr_pubkey, Some(true))
.map_err(VeilidAPIError::generic)?;
let blob = match RouteSpecStore::private_route_to_blob(&private_route) {
Ok(v) => v,
Err(e) => {
rss.release_route(pr_pubkey)
.map_err(VeilidAPIError::generic)?;
return Err(VeilidAPIError::internal(e));
}
};
Ok((pr_pubkey, blob))
}
//////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////
// App Calls // App Calls

View File

@ -4,7 +4,7 @@ use super::*;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub enum Target { pub enum Target {
NodeId(NodeId), NodeId(NodeId),
PrivateRoute(PrivateRoute), PrivateRoute(DHTKey),
} }
pub struct RoutingContextInner {} pub struct RoutingContextInner {}
@ -115,7 +115,7 @@ impl RoutingContext {
// Resolve node // Resolve node
let mut nr = match rpc_processor.resolve_node(node_id.key).await { let mut nr = match rpc_processor.resolve_node(node_id.key).await {
Ok(Some(nr)) => nr, Ok(Some(nr)) => nr,
Ok(None) => return Err(VeilidAPIError::NodeNotFound { node_id }), Ok(None) => return Err(VeilidAPIError::KeyNotFound { key: node_id.key }),
Err(e) => return Err(e.into()), Err(e) => return Err(e.into()),
}; };
// Apply sequencing to match safety selection // Apply sequencing to match safety selection
@ -126,10 +126,17 @@ impl RoutingContext {
safety_selection: self.unlocked_inner.safety_selection, safety_selection: self.unlocked_inner.safety_selection,
}) })
} }
Target::PrivateRoute(pr) => Ok(rpc_processor::Destination::PrivateRoute { Target::PrivateRoute(pr) => {
private_route: pr, // Get remote private route
let rss = self.api.routing_table()?.route_spec_store();
let private_route = rss
.get_remote_private_route(&pr)
.map_err(|_| VeilidAPIError::KeyNotFound { key: pr })?;
Ok(rpc_processor::Destination::PrivateRoute {
private_route,
safety_selection: self.unlocked_inner.safety_selection, safety_selection: self.unlocked_inner.safety_selection,
}), })
}
} }
} }