more private route work

This commit is contained in:
John Smith 2022-10-13 22:05:43 -04:00
parent 2d526674a5
commit e85d72f21a
13 changed files with 323 additions and 256 deletions

View File

@ -64,6 +64,8 @@ core:
max_timestamp_ahead_ms: 10000 max_timestamp_ahead_ms: 10000
timeout_ms: 10000 timeout_ms: 10000
max_route_hop_count: 7 max_route_hop_count: 7
default_route_hop_count: 2
dht: dht:
resolve_node_timeout: resolve_node_timeout:
resolve_node_count: 20 resolve_node_count: 20

View File

@ -229,6 +229,7 @@ rpc:
max_timestamp_ahead_ms: 10000 max_timestamp_ahead_ms: 10000
timeout_ms: 10000 timeout_ms: 10000
max_route_hop_count: 7 max_route_hop_count: 7
default_route_hop_count: 2
``` ```
#### core:network:dht #### core:network:dht

View File

@ -90,7 +90,7 @@ pub struct RoutingTable {
} }
impl RoutingTable { impl RoutingTable {
fn new_inner() -> RoutingTableInner { fn new_inner(config: VeilidConfig) -> RoutingTableInner {
RoutingTableInner { RoutingTableInner {
buckets: Vec::new(), buckets: Vec::new(),
public_internet_routing_domain: PublicInternetRoutingDomainDetail::default(), public_internet_routing_domain: PublicInternetRoutingDomainDetail::default(),
@ -100,7 +100,7 @@ impl RoutingTable {
self_transfer_stats_accounting: TransferStatsAccounting::new(), self_transfer_stats_accounting: TransferStatsAccounting::new(),
self_transfer_stats: TransferStatsDownUp::default(), self_transfer_stats: TransferStatsDownUp::default(),
recent_peers: LruCache::new(RECENT_PEERS_TABLE_SIZE), recent_peers: LruCache::new(RECENT_PEERS_TABLE_SIZE),
route_spec_store: RouteSpecStore::new(), route_spec_store: RouteSpecStore::new(config),
} }
} }
fn new_unlocked_inner( fn new_unlocked_inner(
@ -121,7 +121,7 @@ impl RoutingTable {
pub fn new(network_manager: NetworkManager) -> Self { pub fn new(network_manager: NetworkManager) -> Self {
let config = network_manager.config(); let config = network_manager.config();
let this = Self { let this = Self {
inner: Arc::new(RwLock::new(Self::new_inner())), inner: Arc::new(RwLock::new(Self::new_inner(config.clone()))),
unlocked_inner: Arc::new(Self::new_unlocked_inner(config, network_manager)), unlocked_inner: Arc::new(Self::new_unlocked_inner(config, network_manager)),
}; };
// Set rolling transfers tick task // Set rolling transfers tick task
@ -217,6 +217,22 @@ impl RoutingTable {
} }
} }
pub fn with_route_spec_store_mut<F, R>(&self, f: F) -> R
where
F: FnOnce(&mut RouteSpecStore) -> R,
{
let inner = self.inner.write();
f(&mut inner.route_spec_store)
}
pub fn with_route_spec_store<F, R>(&self, f: F) -> R
where
F: FnOnce(&RouteSpecStore) -> R,
{
let inner = self.inner.read();
f(&inner.route_spec_store)
}
pub fn relay_node(&self, domain: RoutingDomain) -> Option<NodeRef> { pub fn relay_node(&self, domain: RoutingDomain) -> Option<NodeRef> {
let inner = self.inner.read(); let inner = self.inner.read();
Self::with_routing_domain(&*inner, domain, |rd| rd.common().relay_node()) Self::with_routing_domain(&*inner, domain, |rd| rd.common().relay_node())
@ -564,7 +580,7 @@ impl RoutingTable {
error!("kick_buckets_task not stopped: {}", e); error!("kick_buckets_task not stopped: {}", e);
} }
*self.inner.write() = Self::new_inner(); *self.inner.write() = Self::new_inner(self.unlocked_inner.config.clone());
debug!("finished routing table terminate"); debug!("finished routing table terminate");
} }

View File

@ -2,6 +2,24 @@ use super::*;
use crate::veilid_api::*; use crate::veilid_api::*;
use serde::*; use serde::*;
/// Options for safety routes (sender privacy)
#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
pub struct SafetySpec {
/// 0 = no safety route, just use node's node id, more hops is safer but slower
pub hop_count: usize,
/// prefer more reliable protocols and relays over faster ones
pub reliable: bool,
}
/// Compiled route (safety route + private route)
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CompiledRoute {
/// The safety route attached to the private route
safety_route: SafetyRoute,
/// The secret used to encrypt the message payload
secret: DHTKeySecret,
}
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
struct RouteSpecDetail { struct RouteSpecDetail {
/// Secret key /// Secret key
@ -30,6 +48,8 @@ struct RouteSpecDetail {
created_ts: u64, created_ts: u64,
/// Timestamp of when the route was last checked for validity /// Timestamp of when the route was last checked for validity
last_checked_ts: Option<u64>, last_checked_ts: Option<u64>,
/// Timestamp of when the route was last used for anything
last_used_ts: Option<u64>,
/// Directions this route is guaranteed to work in /// Directions this route is guaranteed to work in
directions: DirectionSet, directions: DirectionSet,
/// Reliability /// Reliability
@ -56,10 +76,14 @@ pub struct RouteSpecStoreCache {
#[derive(Debug)] #[derive(Debug)]
pub struct RouteSpecStore { pub struct RouteSpecStore {
/// Maximum number of hops in a route
max_route_hop_count: usize,
/// Default number of hops in a route
default_route_hop_count: usize,
/// Serialize RouteSpecStore content /// Serialize RouteSpecStore content
content: RouteSpecStoreContent, content: RouteSpecStoreContent,
/// RouteSpecStore cache /// RouteSpecStore cache
cache: RouteSpecStoreCache, cache: Mutex<RouteSpecStoreCache>,
} }
fn route_hops_to_hop_cache(hops: &[DHTKey]) -> Vec<u8> { fn route_hops_to_hop_cache(hops: &[DHTKey]) -> Vec<u8> {
@ -145,8 +169,17 @@ where
} }
impl RouteSpecStore { impl RouteSpecStore {
pub fn new() -> Self { pub fn new(config: VeilidConfig) -> Self {
let (max_route_hop_count, default_route_hop_count) = {
let c = config.get();
let max_route_hop_count = c.network.rpc.max_route_hop_count;
let default_route_hop_count = c.network.rpc.max_route_hop_count;
(max_route_hop_count.into(), default_route_hop_count.into())
};
Self { Self {
max_route_hop_count,
default_route_hop_count,
content: RouteSpecStoreContent { content: RouteSpecStoreContent {
details: HashMap::new(), details: HashMap::new(),
}, },
@ -155,11 +188,20 @@ impl RouteSpecStore {
} }
pub async fn load(routing_table: RoutingTable) -> EyreResult<RouteSpecStore> { pub async fn load(routing_table: RoutingTable) -> EyreResult<RouteSpecStore> {
let (max_route_hop_count, default_route_hop_count) = {
let c = routing_table.unlocked_inner.config.get();
let max_route_hop_count = c.network.rpc.max_route_hop_count;
let default_route_hop_count = c.network.rpc.max_route_hop_count;
(max_route_hop_count.into(), default_route_hop_count.into())
};
// Get cbor blob from table store // Get cbor blob from table store
let table_store = routing_table.network_manager().table_store(); let table_store = routing_table.network_manager().table_store();
let rsstdb = table_store.open("RouteSpecStore", 1).await?; let rsstdb = table_store.open("RouteSpecStore", 1).await?;
let content = rsstdb.load_cbor(0, b"content").await?.unwrap_or_default(); let content = rsstdb.load_cbor(0, b"content").await?.unwrap_or_default();
let mut rss = RouteSpecStore { let mut rss = RouteSpecStore {
max_route_hop_count,
default_route_hop_count,
content, content,
cache: Default::default(), cache: Default::default(),
}; };
@ -259,18 +301,11 @@ impl RouteSpecStore {
) -> EyreResult<Option<DHTKey>> { ) -> EyreResult<Option<DHTKey>> {
use core::cmp::Ordering; use core::cmp::Ordering;
let max_route_hop_count = { if hop_count < 1 {
let config = routing_table.network_manager().config(); bail!("Not allocating route less than one hop in length");
let c = config.get();
let max_route_hop_count = c.network.rpc.max_route_hop_count;
max_route_hop_count.into()
};
if hop_count < 2 {
bail!("Not allocating route less than two hops in length");
} }
if hop_count > max_route_hop_count { if hop_count > self.max_route_hop_count {
bail!("Not allocating route longer than max route hop count"); bail!("Not allocating route longer than max route hop count");
} }
@ -497,6 +532,7 @@ impl RouteSpecStore {
published: false, published: false,
created_ts: cur_ts, created_ts: cur_ts,
last_checked_ts: None, last_checked_ts: None,
last_used_ts: None,
directions, directions,
reliable, reliable,
}; };
@ -568,8 +604,117 @@ impl RouteSpecStore {
None None
} }
/// xxx add route compiler here //////////////////////////////////////////////////////////////////////
///
/// Compiles a safety route to the private route, with caching
pub fn compile_safety_route(
&self,
safety_spec: SafetySpec,
private_route: PrivateRoute,
) -> Result<CompiledRoute, RPCError> {
// xxx implement caching first!
// xxx implement, ensure we handle hops == 0 for our safetyspec
// Ensure the total hop count isn't too long for our config
let pr_hopcount = private_route.hop_count as usize;
let sr_hopcount = safety_route_spec.hops.len();
let hopcount = 1 + sr_hopcount + pr_hopcount;
if hopcount > self.max_route_hop_count {
return Err(RPCError::internal("hop count too long for route"));
}
// Create hops
let hops = if sr_hopcount == 0 {
SafetyRouteHops::Private(private_route)
} else {
// start last blob-to-encrypt data off as private route
let mut blob_data = {
let mut pr_message = ::capnp::message::Builder::new_default();
let mut pr_builder = pr_message.init_root::<veilid_capnp::private_route::Builder>();
encode_private_route(&private_route, &mut pr_builder)?;
let mut blob_data = builder_to_vec(pr_message)?;
// append the private route tag so we know how to decode it later
blob_data.push(1u8);
blob_data
};
// Encode each hop from inside to outside
// skips the outermost hop since that's entering the
// safety route and does not include the dialInfo
// (outer hop is a RouteHopData, not a RouteHop).
// Each loop mutates 'nonce', and 'blob_data'
let mut nonce = Crypto::get_random_nonce();
for h in (1..sr_hopcount).rev() {
// Get blob to encrypt for next hop
blob_data = {
// Encrypt the previous blob ENC(nonce, DH(PKhop,SKsr))
let dh_secret = self
.crypto
.cached_dh(
&safety_route_spec.hops[h].dial_info.node_id.key,
&safety_route_spec.secret_key,
)
.map_err(RPCError::map_internal("dh failed"))?;
let enc_msg_data =
Crypto::encrypt_aead(blob_data.as_slice(), &nonce, &dh_secret, None)
.map_err(RPCError::map_internal("encryption failed"))?;
// Make route hop data
let route_hop_data = RouteHopData {
nonce,
blob: enc_msg_data,
};
// Make route hop
let route_hop = RouteHop {
dial_info: safety_route_spec.hops[h].dial_info.clone(),
next_hop: Some(route_hop_data),
};
// Make next blob from route hop
let mut rh_message = ::capnp::message::Builder::new_default();
let mut rh_builder = rh_message.init_root::<veilid_capnp::route_hop::Builder>();
encode_route_hop(&route_hop, &mut rh_builder)?;
let mut blob_data = builder_to_vec(rh_message)?;
// Append the route hop tag so we know how to decode it later
blob_data.push(0u8);
blob_data
};
// Make another nonce for the next hop
nonce = Crypto::get_random_nonce();
}
// Encode first RouteHopData
let dh_secret = self
.crypto
.cached_dh(
&safety_route_spec.hops[0].dial_info.node_id.key,
&safety_route_spec.secret_key,
)
.map_err(RPCError::map_internal("dh failed"))?;
let enc_msg_data = Crypto::encrypt_aead(blob_data.as_slice(), &nonce, &dh_secret, None)
.map_err(RPCError::map_internal("encryption failed"))?;
let route_hop_data = RouteHopData {
nonce,
blob: enc_msg_data,
};
SafetyRouteHops::Data(route_hop_data)
};
// Build safety route
let safety_route = SafetyRoute {
public_key: safety_route_spec.public_key,
hop_count: safety_route_spec.hops.len() as u8,
hops,
};
Ok(safety_route)
}
/// Mark route as published /// Mark route as published
/// When first deserialized, routes must be re-published in order to ensure they remain /// When first deserialized, routes must be re-published in order to ensure they remain
@ -583,6 +728,11 @@ impl RouteSpecStore {
self.detail_mut(&key).last_checked_ts = Some(cur_ts); self.detail_mut(&key).last_checked_ts = Some(cur_ts);
} }
/// Mark route as used
pub fn touch_route_used(&mut self, key: &DHTKey, cur_ts: u64) {
self.detail_mut(&key).last_used_ts = Some(cur_ts);
}
/// Record latency on the route /// Record latency on the route
pub fn record_latency(&mut self, key: &DHTKey, latency: u64) { pub fn record_latency(&mut self, key: &DHTKey, latency: u64) {
let lsa = &mut self.detail_mut(&key).latency_stats_accounting; let lsa = &mut self.detail_mut(&key).latency_stats_accounting;

View File

@ -8,6 +8,7 @@ pub enum Destination {
/// The node to send to /// The node to send to
target: NodeRef, target: NodeRef,
/// Require safety route or not /// Require safety route or not
xxx convert back to safety spec, bubble up to api
safety: bool, safety: bool,
}, },
/// Send to node for relay purposes /// Send to node for relay purposes

View File

@ -134,8 +134,8 @@ impl<T> Answer<T> {
struct RenderedOperation { struct RenderedOperation {
message: Vec<u8>, // The rendered operation bytes message: Vec<u8>, // The rendered operation bytes
node_id: DHTKey, // Node id we're sending to node_id: DHTKey, // Destination node id we're sending to
node_ref: Option<NodeRef>, // Node to send envelope to (may not be destination node id in case of relay) node_ref: NodeRef, // Node to send envelope to (may not be destination node id in case of relay)
hop_count: usize, // Total safety + private route hop count hop_count: usize, // Total safety + private route hop count
} }
///////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////
@ -151,6 +151,7 @@ pub struct RPCProcessorUnlockedInner {
queue_size: u32, queue_size: u32,
concurrency: u32, concurrency: u32,
max_route_hop_count: usize, max_route_hop_count: usize,
default_route_hop_count: usize,
validate_dial_info_receipt_time_ms: u32, validate_dial_info_receipt_time_ms: u32,
update_callback: UpdateCallback, update_callback: UpdateCallback,
waiting_rpc_table: OperationWaiter<RPCMessage>, waiting_rpc_table: OperationWaiter<RPCMessage>,
@ -187,21 +188,13 @@ impl RPCProcessor {
let mut queue_size = c.network.rpc.queue_size; let mut queue_size = c.network.rpc.queue_size;
let mut timeout = ms_to_us(c.network.rpc.timeout_ms); let mut timeout = ms_to_us(c.network.rpc.timeout_ms);
let mut max_route_hop_count = c.network.rpc.max_route_hop_count as usize; let mut max_route_hop_count = c.network.rpc.max_route_hop_count as usize;
let mut default_route_hop_count = c.network.rpc.default_route_hop_count as usize;
if concurrency == 0 { if concurrency == 0 {
concurrency = intf::get_concurrency() / 2; concurrency = intf::get_concurrency() / 2;
if concurrency == 0 { if concurrency == 0 {
concurrency = 1; concurrency = 1;
} }
} }
if queue_size == 0 {
queue_size = 1024;
}
if timeout == 0 {
timeout = 10000000;
}
if max_route_hop_count == 0 {
max_route_hop_count = 7usize;
}
let validate_dial_info_receipt_time_ms = c.network.dht.validate_dial_info_receipt_time_ms; let validate_dial_info_receipt_time_ms = c.network.dht.validate_dial_info_receipt_time_ms;
RPCProcessorUnlockedInner { RPCProcessorUnlockedInner {
@ -209,6 +202,7 @@ impl RPCProcessor {
queue_size, queue_size,
concurrency, concurrency,
max_route_hop_count, max_route_hop_count,
default_route_hop_count,
validate_dial_info_receipt_time_ms, validate_dial_info_receipt_time_ms,
update_callback, update_callback,
waiting_rpc_table: OperationWaiter::new(), waiting_rpc_table: OperationWaiter::new(),
@ -403,6 +397,65 @@ impl RPCProcessor {
out out
} }
// Wrap an operation with a private route inside a safety route
pub(super) fn wrap_with_route(
&self,
safety_spec: SafetySpec,
private_route: PrivateRoute,
message_data: Vec<u8>,
) -> Result<RenderedOperation, RPCError> {
let compiled_route: CompiledRoute = self.routing_table().with_route_spec_store(|rss| {
// Compile the safety route with the private route
rss.compile_safety_route(safety_spec, private_route)
})?;
// Verify hop count isn't larger than out maximum routed hop count
if compiled_route.safety_route.hop_count as usize > self.unlocked_inner.max_route_hop_count
{
return Err(RPCError::internal("hop count too long for route"))
.map_err(logthru_rpc!(warn));
}
// Encrypt routed operation
// Xmsg + ENC(Xmsg, DH(PKapr, SKbsr))
let nonce = Crypto::get_random_nonce();
let dh_secret = self
.crypto
.cached_dh(&private_route.public_key, &compiled_route.secret)
.map_err(RPCError::map_internal("dh failed"))?;
let enc_msg_data = Crypto::encrypt_aead(&message_data, &nonce, &dh_secret, None)
.map_err(RPCError::map_internal("encryption failed"))?;
// Make the routed operation
let operation = RoutedOperation::new(nonce, enc_msg_data);
// Prepare route operation
let route = RPCOperationRoute {
safety_route,
operation,
};
let operation =
RPCOperation::new_statement(RPCStatement::new(RPCStatementDetail::Route(route)), None);
// Convert message to bytes and return it
let mut route_msg = ::capnp::message::Builder::new_default();
let mut route_operation = route_msg.init_root::<veilid_capnp::operation::Builder>();
operation.encode(&mut route_operation)?;
let out = builder_to_vec(route_msg)?;
// out_node_id = sr
// .hops
// .first()
// .ok_or_else(RPCError::else_internal("no hop in safety route"))?
// .dial_info
// .node_id
// .key;
//out_hop_count = 1 + sr.hops.len();
Ok(out)
}
/// Produce a byte buffer that represents the wire encoding of the entire /// Produce a byte buffer that represents the wire encoding of the entire
/// unencrypted envelope body for a RPC message. This incorporates /// unencrypted envelope body for a RPC message. This incorporates
/// wrapping a private and/or safety route if they are specified. /// wrapping a private and/or safety route if they are specified.
@ -412,14 +465,11 @@ impl RPCProcessor {
dest: Destination, dest: Destination,
operation: &RPCOperation, operation: &RPCOperation,
) -> Result<RenderedOperation, RPCError> { ) -> Result<RenderedOperation, RPCError> {
let out_node_id; // Envelope Node Id let out: RenderedOperation;
let mut out_node_ref: Option<NodeRef> = None; // Node to send envelope to
let out_hop_count: usize; // Total safety + private route hop count
let out_message; // Envelope data
// Encode message to a builder and make a message reader for it // Encode message to a builder and make a message reader for it
// Then produce the message as an unencrypted byte buffer // Then produce the message as an unencrypted byte buffer
let message_vec = { let message = {
let mut msg_builder = ::capnp::message::Builder::new_default(); let mut msg_builder = ::capnp::message::Builder::new_default();
let mut op_builder = msg_builder.init_root::<veilid_capnp::operation::Builder>(); let mut op_builder = msg_builder.init_root::<veilid_capnp::operation::Builder>();
operation.encode(&mut op_builder)?; operation.encode(&mut op_builder)?;
@ -430,12 +480,12 @@ impl RPCProcessor {
match dest { match dest {
Destination::Direct { Destination::Direct {
target: ref node_ref, target: ref node_ref,
ref safety_route_spec, safety,
} }
| Destination::Relay { | Destination::Relay {
relay: ref node_ref, relay: ref node_ref,
target: _, target: _,
ref safety_route_spec, safety,
} => { } => {
// Send to a node without a private route // Send to a node without a private route
// -------------------------------------- // --------------------------------------
@ -444,7 +494,7 @@ impl RPCProcessor {
let (node_ref, node_id) = if let Destination::Relay { let (node_ref, node_id) = if let Destination::Relay {
relay: _, relay: _,
target: ref dht_key, target: ref dht_key,
safety_route_spec: _, safety: _,
} = dest } = dest
{ {
(node_ref.clone(), dht_key.clone()) (node_ref.clone(), dht_key.clone())
@ -454,83 +504,40 @@ impl RPCProcessor {
}; };
// Handle the existence of safety route // Handle the existence of safety route
match safety_route_spec.as_ref() { match safety {
None => { false => {
// If no safety route is being used, and we're not sending to a private // If no safety route is being used, and we're not sending to a private
// route, we can use a direct envelope instead of routing // route, we can use a direct envelope instead of routing
out_message = message_vec; out = RenderedOperation {
message,
// Message goes directly to the node node_id,
out_node_id = node_id; node_ref,
out_node_ref = Some(node_ref); hop_count: 1,
out_hop_count = 1; };
} }
Some(sr) => { true => {
// No private route was specified for the request // No private route was specified for the request
// but we are using a safety route, so we must create an empty private route // but we are using a safety route, so we must create an empty private route
let private_route = PrivateRoute::new_stub(node_id); let private_route = PrivateRoute::new_stub(node_id);
// first // Wrap with safety route
out_node_id = sr out = self.wrap_with_route(true, private_route, message)?;
.hops
.first()
.ok_or_else(RPCError::else_internal("no hop in safety route"))?
.dial_info
.node_id
.key;
out_message =
self.wrap_with_route(Some(sr.clone()), private_route, message_vec)?;
out_hop_count = 1 + sr.hops.len();
} }
}; };
} }
Destination::PrivateRoute { Destination::PrivateRoute {
private_route, private_route,
safety_route_spec, safety,
reliable,
} => { } => {
// Send to private route // Send to private route
// --------------------- // ---------------------
// Reply with 'route' operation // Reply with 'route' operation
out_node_id = match safety_route_spec { out = self.wrap_with_route(safety, private_route, message)?;
None => {
// If no safety route, the first node is the first hop of the private route
out_hop_count = private_route.hop_count as usize;
let out_node_id = match &private_route.hops {
Some(rh) => rh.dial_info.node_id.key,
_ => return Err(RPCError::internal("private route has no hops")),
};
out_message = self.wrap_with_route(None, private_route, message_vec)?;
out_node_id
}
Some(sr) => {
// If safety route is in use, first node is the first hop of the safety route
out_hop_count = 1 + sr.hops.len() + (private_route.hop_count as usize);
let out_node_id = sr
.hops
.first()
.ok_or_else(RPCError::else_internal("no hop in safety route"))?
.dial_info
.node_id
.key;
out_message = self.wrap_with_route(Some(sr), private_route, message_vec)?;
out_node_id
}
}
} }
} }
// Verify hop count isn't larger than out maximum routed hop count Ok(out)
if out_hop_count > self.unlocked_inner.max_route_hop_count {
return Err(RPCError::internal("hop count too long for route"))
.map_err(logthru_rpc!(warn));
}
Ok(RenderedOperation {
message: out_message,
node_id: out_node_id,
node_ref: out_node_ref,
hop_count: out_hop_count,
})
} }
// Get signed node info to package with RPC messages to improve // Get signed node info to package with RPC messages to improve

View File

@ -1,154 +0,0 @@
use super::*;
impl RPCProcessor {
xxx move this into route spec store
//////////////////////////////////////////////////////////////////////
fn compile_safety_route(
&self,
safety_route_spec: Arc<SafetyRouteSpec>,
private_route: PrivateRoute,
) -> Result<SafetyRoute, RPCError> {
// Ensure the total hop count isn't too long for our config
let pr_hopcount = private_route.hop_count as usize;
let sr_hopcount = safety_route_spec.hops.len();
let hopcount = 1 + sr_hopcount + pr_hopcount;
if hopcount > self.unlocked_inner.max_route_hop_count {
return Err(RPCError::internal("hop count too long for route"));
}
// Create hops
let hops = if sr_hopcount == 0 {
SafetyRouteHops::Private(private_route)
} else {
// start last blob-to-encrypt data off as private route
let mut blob_data = {
let mut pr_message = ::capnp::message::Builder::new_default();
let mut pr_builder = pr_message.init_root::<veilid_capnp::private_route::Builder>();
encode_private_route(&private_route, &mut pr_builder)?;
let mut blob_data = builder_to_vec(pr_message)?;
// append the private route tag so we know how to decode it later
blob_data.push(1u8);
blob_data
};
// Encode each hop from inside to outside
// skips the outermost hop since that's entering the
// safety route and does not include the dialInfo
// (outer hop is a RouteHopData, not a RouteHop).
// Each loop mutates 'nonce', and 'blob_data'
let mut nonce = Crypto::get_random_nonce();
for h in (1..sr_hopcount).rev() {
// Get blob to encrypt for next hop
blob_data = {
// Encrypt the previous blob ENC(nonce, DH(PKhop,SKsr))
let dh_secret = self
.crypto
.cached_dh(
&safety_route_spec.hops[h].dial_info.node_id.key,
&safety_route_spec.secret_key,
)
.map_err(RPCError::map_internal("dh failed"))?;
let enc_msg_data =
Crypto::encrypt_aead(blob_data.as_slice(), &nonce, &dh_secret, None)
.map_err(RPCError::map_internal("encryption failed"))?;
// Make route hop data
let route_hop_data = RouteHopData {
nonce,
blob: enc_msg_data,
};
// Make route hop
let route_hop = RouteHop {
dial_info: safety_route_spec.hops[h].dial_info.clone(),
next_hop: Some(route_hop_data),
};
// Make next blob from route hop
let mut rh_message = ::capnp::message::Builder::new_default();
let mut rh_builder = rh_message.init_root::<veilid_capnp::route_hop::Builder>();
encode_route_hop(&route_hop, &mut rh_builder)?;
let mut blob_data = builder_to_vec(rh_message)?;
// Append the route hop tag so we know how to decode it later
blob_data.push(0u8);
blob_data
};
// Make another nonce for the next hop
nonce = Crypto::get_random_nonce();
}
// Encode first RouteHopData
let dh_secret = self
.crypto
.cached_dh(
&safety_route_spec.hops[0].dial_info.node_id.key,
&safety_route_spec.secret_key,
)
.map_err(RPCError::map_internal("dh failed"))?;
let enc_msg_data = Crypto::encrypt_aead(blob_data.as_slice(), &nonce, &dh_secret, None)
.map_err(RPCError::map_internal("encryption failed"))?;
let route_hop_data = RouteHopData {
nonce,
blob: enc_msg_data,
};
SafetyRouteHops::Data(route_hop_data)
};
// Build safety route
let safety_route = SafetyRoute {
public_key: safety_route_spec.public_key,
hop_count: safety_route_spec.hops.len() as u8,
hops,
};
Ok(safety_route)
}
// Wrap an operation inside a route
pub(super) fn wrap_with_route(
&self,
safety_route_spec: Option<Arc<SafetyRouteSpec>>,
private_route: PrivateRoute,
message_data: Vec<u8>,
) -> Result<Vec<u8>, RPCError> {
// Encrypt routed operation
// Xmsg + ENC(Xmsg, DH(PKapr, SKbsr))
let nonce = Crypto::get_random_nonce();
let safety_route_spec =
safety_route_spec.unwrap_or_else(|| Arc::new(SafetyRouteSpec::new()));
let dh_secret = self
.crypto
.cached_dh(&private_route.public_key, &safety_route_spec.secret_key)
.map_err(RPCError::map_internal("dh failed"))?;
let enc_msg_data = Crypto::encrypt_aead(&message_data, &nonce, &dh_secret, None)
.map_err(RPCError::map_internal("encryption failed"))?;
// Compile the safety route with the private route
let safety_route = self.compile_safety_route(safety_route_spec, private_route)?;
// Make the routed operation
let operation = RoutedOperation::new(nonce, enc_msg_data);
// Prepare route operation
let route = RPCOperationRoute {
safety_route,
operation,
};
let operation =
RPCOperation::new_statement(RPCStatement::new(RPCStatementDetail::Route(route)), None);
// Convert message to bytes and return it
let mut route_msg = ::capnp::message::Builder::new_default();
let mut route_operation = route_msg.init_root::<veilid_capnp::operation::Builder>();
operation.encode(&mut route_operation)?;
let out = builder_to_vec(route_msg)?;
Ok(out)
}
}

View File

@ -208,6 +208,7 @@ fn config_callback(key: String) -> ConfigCallbackReturn {
"network.rpc.max_timestamp_ahead_ms" => Ok(Box::new(Some(10_000u32))), "network.rpc.max_timestamp_ahead_ms" => Ok(Box::new(Some(10_000u32))),
"network.rpc.timeout_ms" => Ok(Box::new(10_000u32)), "network.rpc.timeout_ms" => Ok(Box::new(10_000u32)),
"network.rpc.max_route_hop_count" => Ok(Box::new(7u8)), "network.rpc.max_route_hop_count" => Ok(Box::new(7u8)),
"network.rpc.default_route_hop_count" => Ok(Box::new(2u8)),
"network.dht.resolve_node_timeout_ms" => Ok(Box::new(Option::<u32>::None)), "network.dht.resolve_node_timeout_ms" => Ok(Box::new(Option::<u32>::None)),
"network.dht.resolve_node_count" => Ok(Box::new(20u32)), "network.dht.resolve_node_count" => Ok(Box::new(20u32)),
"network.dht.resolve_node_fanout" => Ok(Box::new(3u32)), "network.dht.resolve_node_fanout" => Ok(Box::new(3u32)),
@ -325,6 +326,7 @@ pub async fn test_config() {
assert_eq!(inner.network.rpc.queue_size, 128u32); assert_eq!(inner.network.rpc.queue_size, 128u32);
assert_eq!(inner.network.rpc.timeout_ms, 10_000u32); assert_eq!(inner.network.rpc.timeout_ms, 10_000u32);
assert_eq!(inner.network.rpc.max_route_hop_count, 7u8); assert_eq!(inner.network.rpc.max_route_hop_count, 7u8);
assert_eq!(inner.network.rpc.default_route_hop_count, 2u8);
assert_eq!(inner.network.routing_table.limit_over_attached, 64u32); assert_eq!(inner.network.routing_table.limit_over_attached, 64u32);
assert_eq!(inner.network.routing_table.limit_fully_attached, 32u32); assert_eq!(inner.network.routing_table.limit_fully_attached, 32u32);
assert_eq!(inner.network.routing_table.limit_attached_strong, 16u32); assert_eq!(inner.network.routing_table.limit_attached_strong, 16u32);

View File

@ -192,6 +192,7 @@ pub struct VeilidConfigRPC {
pub max_timestamp_ahead_ms: Option<u32>, pub max_timestamp_ahead_ms: Option<u32>,
pub timeout_ms: u32, pub timeout_ms: u32,
pub max_route_hop_count: u8, pub max_route_hop_count: u8,
pub default_route_hop_count: u8,
} }
/// Configure the network routing table /// Configure the network routing table
@ -444,6 +445,7 @@ impl VeilidConfig {
get_config!(inner.network.rpc.max_timestamp_ahead_ms); get_config!(inner.network.rpc.max_timestamp_ahead_ms);
get_config!(inner.network.rpc.timeout_ms); get_config!(inner.network.rpc.timeout_ms);
get_config!(inner.network.rpc.max_route_hop_count); get_config!(inner.network.rpc.max_route_hop_count);
get_config!(inner.network.rpc.default_route_hop_count);
get_config!(inner.network.upnp); get_config!(inner.network.upnp);
get_config!(inner.network.natpmp); get_config!(inner.network.natpmp);
get_config!(inner.network.detect_address_changes); get_config!(inner.network.detect_address_changes);
@ -634,6 +636,33 @@ impl VeilidConfig {
); );
} }
} }
if inner.network.rpc.max_route_hop_count == 0 {
apibail_generic!(
"max route hop count must be >= 1 in 'network.rpc.max_route_hop_count'"
);
}
if inner.network.rpc.max_route_hop_count > 7 {
apibail_generic!(
"max route hop count must be <= 7 in 'network.rpc.max_route_hop_count'"
);
}
if inner.network.rpc.default_route_hop_count == 0 {
apibail_generic!(
"default route hop count must be >= 1 in 'network.rpc.default_route_hop_count'"
);
}
if inner.network.rpc.default_route_hop_count > inner.network.rpc.max_route_hop_count {
apibail_generic!(
"default route hop count must be <= max route hop count in 'network.rpc.default_route_hop_count <= network.rpc.max_route_hop_count'"
);
}
if inner.network.rpc.queue_size < 256 {
apibail_generic!("rpc queue size must be >= 256 in 'network.rpc.queue_size'");
}
if inner.network.rpc.timeout_ms < 1000 {
apibail_generic!("rpc timeout must be >= 1000 in 'network.rpc.timeout_ms'");
}
Ok(()) Ok(())
} }

View File

@ -66,6 +66,7 @@ Future<VeilidConfig> getDefaultVeilidConfig() async {
maxTimestampAheadMs: 10000, maxTimestampAheadMs: 10000,
timeoutMs: 10000, timeoutMs: 10000,
maxRouteHopCount: 7, maxRouteHopCount: 7,
defaultRouteHopCount: 2,
), ),
dht: VeilidConfigDHT( dht: VeilidConfigDHT(
resolveNodeTimeoutMs: null, resolveNodeTimeoutMs: null,

View File

@ -657,6 +657,7 @@ class VeilidConfigRPC {
int? maxTimestampAheadMs; int? maxTimestampAheadMs;
int timeoutMs; int timeoutMs;
int maxRouteHopCount; int maxRouteHopCount;
int defaultRouteHopCount;
VeilidConfigRPC( VeilidConfigRPC(
{required this.concurrency, {required this.concurrency,
@ -664,7 +665,8 @@ class VeilidConfigRPC {
this.maxTimestampBehindMs, this.maxTimestampBehindMs,
this.maxTimestampAheadMs, this.maxTimestampAheadMs,
required this.timeoutMs, required this.timeoutMs,
required this.maxRouteHopCount}); required this.maxRouteHopCount,
required this.defaultRouteHopCount});
Map<String, dynamic> get json { Map<String, dynamic> get json {
return { return {
@ -674,6 +676,7 @@ class VeilidConfigRPC {
'max_timestamp_ahead_ms': maxTimestampAheadMs, 'max_timestamp_ahead_ms': maxTimestampAheadMs,
'timeout_ms': timeoutMs, 'timeout_ms': timeoutMs,
'max_route_hop_count': maxRouteHopCount, 'max_route_hop_count': maxRouteHopCount,
'default_route_hop_count': defaultRouteHopCount,
}; };
} }
@ -683,7 +686,8 @@ class VeilidConfigRPC {
maxTimestampBehindMs = json['max_timestamp_behind_ms'], maxTimestampBehindMs = json['max_timestamp_behind_ms'],
maxTimestampAheadMs = json['max_timestamp_ahead_ms'], maxTimestampAheadMs = json['max_timestamp_ahead_ms'],
timeoutMs = json['timeout_ms'], timeoutMs = json['timeout_ms'],
maxRouteHopCount = json['max_route_hop_count']; maxRouteHopCount = json['max_route_hop_count'],
defaultRouteHopCount = json['default_route_hop_count'];
} }
//////////// ////////////

View File

@ -82,6 +82,7 @@ core:
max_timestamp_ahead_ms: 10000 max_timestamp_ahead_ms: 10000
timeout_ms: 10000 timeout_ms: 10000
max_route_hop_count: 7 max_route_hop_count: 7
default_route_hop_count: 2
dht: dht:
resolve_node_timeout: resolve_node_timeout:
resolve_node_count: 20 resolve_node_count: 20
@ -539,6 +540,7 @@ pub struct Rpc {
pub max_timestamp_ahead_ms: Option<u32>, pub max_timestamp_ahead_ms: Option<u32>,
pub timeout_ms: u32, pub timeout_ms: u32,
pub max_route_hop_count: u8, pub max_route_hop_count: u8,
pub default_route_hop_count: u8,
} }
#[derive(Debug, Deserialize, Serialize)] #[derive(Debug, Deserialize, Serialize)]
@ -965,6 +967,7 @@ impl Settings {
set_config_value!(inner.core.network.rpc.max_timestamp_ahead_ms, value); set_config_value!(inner.core.network.rpc.max_timestamp_ahead_ms, value);
set_config_value!(inner.core.network.rpc.timeout_ms, value); set_config_value!(inner.core.network.rpc.timeout_ms, value);
set_config_value!(inner.core.network.rpc.max_route_hop_count, value); set_config_value!(inner.core.network.rpc.max_route_hop_count, value);
set_config_value!(inner.core.network.rpc.default_route_hop_count, value);
set_config_value!(inner.core.network.dht.resolve_node_timeout_ms, value); set_config_value!(inner.core.network.dht.resolve_node_timeout_ms, value);
set_config_value!(inner.core.network.dht.resolve_node_count, value); set_config_value!(inner.core.network.dht.resolve_node_count, value);
set_config_value!(inner.core.network.dht.resolve_node_fanout, value); set_config_value!(inner.core.network.dht.resolve_node_fanout, value);
@ -1142,6 +1145,9 @@ impl Settings {
"network.rpc.max_route_hop_count" => { "network.rpc.max_route_hop_count" => {
Ok(Box::new(inner.core.network.rpc.max_route_hop_count)) Ok(Box::new(inner.core.network.rpc.max_route_hop_count))
} }
"network.rpc.default_route_hop_count" => {
Ok(Box::new(inner.core.network.rpc.default_route_hop_count))
}
"network.dht.resolve_node_timeout_ms" => { "network.dht.resolve_node_timeout_ms" => {
Ok(Box::new(inner.core.network.dht.resolve_node_timeout_ms)) Ok(Box::new(inner.core.network.dht.resolve_node_timeout_ms))
} }
@ -1486,6 +1492,7 @@ mod tests {
assert_eq!(s.core.network.rpc.max_timestamp_ahead_ms, Some(10_000u32)); assert_eq!(s.core.network.rpc.max_timestamp_ahead_ms, Some(10_000u32));
assert_eq!(s.core.network.rpc.timeout_ms, 10_000u32); assert_eq!(s.core.network.rpc.timeout_ms, 10_000u32);
assert_eq!(s.core.network.rpc.max_route_hop_count, 7); assert_eq!(s.core.network.rpc.max_route_hop_count, 7);
assert_eq!(s.core.network.rpc.default_route_hop_count, 2);
// //
assert_eq!(s.core.network.dht.resolve_node_timeout_ms, None); assert_eq!(s.core.network.dht.resolve_node_timeout_ms, None);
assert_eq!(s.core.network.dht.resolve_node_count, 20u32); assert_eq!(s.core.network.dht.resolve_node_count, 20u32);

View File

@ -46,6 +46,7 @@ fn init_callbacks() {
case "network.rpc.max_timestamp_ahead": return 10000000; case "network.rpc.max_timestamp_ahead": return 10000000;
case "network.rpc.timeout": return 10000000; case "network.rpc.timeout": return 10000000;
case "network.rpc.max_route_hop_count": return 7; case "network.rpc.max_route_hop_count": return 7;
case "network.rpc.default_route_hop_count": return 2;
case "network.dht.resolve_node_timeout": return null; case "network.dht.resolve_node_timeout": return null;
case "network.dht.resolve_node_count": return 20; case "network.dht.resolve_node_count": return 20;
case "network.dht.resolve_node_fanout": return 3; case "network.dht.resolve_node_fanout": return 3;