checkpoint
This commit is contained in:
parent
e85d72f21a
commit
63768580c6
@ -63,7 +63,7 @@ core:
|
||||
max_timestamp_behind_ms: 10000
|
||||
max_timestamp_ahead_ms: 10000
|
||||
timeout_ms: 10000
|
||||
max_route_hop_count: 7
|
||||
max_route_hop_count: 5
|
||||
default_route_hop_count: 2
|
||||
|
||||
dht:
|
||||
|
@ -228,7 +228,7 @@ rpc:
|
||||
max_timestamp_behind_ms: 10000
|
||||
max_timestamp_ahead_ms: 10000
|
||||
timeout_ms: 10000
|
||||
max_route_hop_count: 7
|
||||
max_route_hop_count: 4
|
||||
default_route_hop_count: 2
|
||||
```
|
||||
|
||||
|
@ -218,17 +218,36 @@ impl RoutingTable {
|
||||
) -> core::cmp::Ordering,
|
||||
T: FnMut(&'a RoutingTableInner, DHTKey, Option<Arc<BucketEntry>>) -> O,
|
||||
{
|
||||
let inner = self.inner.read();
|
||||
let inner = &*inner;
|
||||
let self_node_id = self.unlocked_inner.node_id;
|
||||
let inner = &*self.inner.read();
|
||||
Self::find_peers_with_sort_and_filter_inner(
|
||||
inner, node_count, cur_ts, filter, compare, transform,
|
||||
)
|
||||
}
|
||||
|
||||
pub fn find_peers_with_sort_and_filter_inner<'a, 'b, F, C, T, O>(
|
||||
inner: &RoutingTableInner,
|
||||
node_count: usize,
|
||||
cur_ts: u64,
|
||||
mut filter: F,
|
||||
compare: C,
|
||||
mut transform: T,
|
||||
) -> Vec<O>
|
||||
where
|
||||
F: FnMut(&'a RoutingTableInner, DHTKey, Option<Arc<BucketEntry>>) -> bool,
|
||||
C: FnMut(
|
||||
&'a RoutingTableInner,
|
||||
&'b (DHTKey, Option<Arc<BucketEntry>>),
|
||||
&'b (DHTKey, Option<Arc<BucketEntry>>),
|
||||
) -> core::cmp::Ordering,
|
||||
T: FnMut(&'a RoutingTableInner, DHTKey, Option<Arc<BucketEntry>>) -> O,
|
||||
{
|
||||
// collect all the nodes for sorting
|
||||
let mut nodes =
|
||||
Vec::<(DHTKey, Option<Arc<BucketEntry>>)>::with_capacity(inner.bucket_entry_count + 1);
|
||||
|
||||
// add our own node (only one of there with the None entry)
|
||||
if filter(inner, self_node_id, None) {
|
||||
nodes.push((self_node_id, None));
|
||||
if filter(inner, inner.node_id, None) {
|
||||
nodes.push((inner.node_id, None));
|
||||
}
|
||||
|
||||
// add all nodes from buckets
|
||||
|
@ -219,18 +219,18 @@ impl RoutingTable {
|
||||
|
||||
pub fn with_route_spec_store_mut<F, R>(&self, f: F) -> R
|
||||
where
|
||||
F: FnOnce(&mut RouteSpecStore) -> R,
|
||||
F: FnOnce(&mut RouteSpecStore, &mut RoutingTableInner) -> R,
|
||||
{
|
||||
let inner = self.inner.write();
|
||||
f(&mut inner.route_spec_store)
|
||||
let inner = &mut *self.inner.write();
|
||||
f(&mut inner.route_spec_store, inner)
|
||||
}
|
||||
|
||||
pub fn with_route_spec_store<F, R>(&self, f: F) -> R
|
||||
where
|
||||
F: FnOnce(&RouteSpecStore) -> R,
|
||||
F: FnOnce(&RouteSpecStore, &RoutingTableInner) -> R,
|
||||
{
|
||||
let inner = self.inner.read();
|
||||
f(&inner.route_spec_store)
|
||||
let inner = &*self.inner.read();
|
||||
f(&inner.route_spec_store, inner)
|
||||
}
|
||||
|
||||
pub fn relay_node(&self, domain: RoutingDomain) -> Option<NodeRef> {
|
||||
@ -339,6 +339,30 @@ impl RoutingTable {
|
||||
true
|
||||
}
|
||||
|
||||
#[instrument(level = "trace", skip(inner), ret)]
|
||||
fn get_contact_method_inner(
|
||||
inner: &RoutingTableInner,
|
||||
routing_domain: RoutingDomain,
|
||||
node_a_id: &DHTKey,
|
||||
node_a: &NodeInfo,
|
||||
node_b_id: &DHTKey,
|
||||
node_b: &NodeInfo,
|
||||
dial_info_filter: DialInfoFilter,
|
||||
reliable: bool,
|
||||
) -> ContactMethod {
|
||||
Self::with_routing_domain(inner, routing_domain, |rdd| {
|
||||
rdd.get_contact_method(
|
||||
inner,
|
||||
node_a_id,
|
||||
node_a,
|
||||
node_b_id,
|
||||
node_b,
|
||||
dial_info_filter,
|
||||
reliable,
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
/// Look up the best way for two nodes to reach each other over a specific routing domain
|
||||
#[instrument(level = "trace", skip(self), ret)]
|
||||
pub fn get_contact_method(
|
||||
@ -352,17 +376,16 @@ impl RoutingTable {
|
||||
reliable: bool,
|
||||
) -> ContactMethod {
|
||||
let inner = &*self.inner.read();
|
||||
Self::with_routing_domain(inner, routing_domain, |rdd| {
|
||||
rdd.get_contact_method(
|
||||
inner,
|
||||
node_a_id,
|
||||
node_a,
|
||||
node_b_id,
|
||||
node_b,
|
||||
dial_info_filter,
|
||||
reliable,
|
||||
)
|
||||
})
|
||||
Self::get_contact_method_inner(
|
||||
inner,
|
||||
routing_domain,
|
||||
node_a_id,
|
||||
node_a,
|
||||
node_b_id,
|
||||
node_b,
|
||||
dial_info_filter,
|
||||
reliable,
|
||||
)
|
||||
}
|
||||
|
||||
// Figure out how to reach a node from our own node over the best routing domain and reference the nodes we want to access
|
||||
@ -384,11 +407,11 @@ impl RoutingTable {
|
||||
};
|
||||
|
||||
// Node A is our own node
|
||||
let node_a = self.get_own_node_info(routing_domain);
|
||||
let node_a = get_own_node_info_inner(inner, routing_domain);
|
||||
let node_a_id = self.node_id();
|
||||
|
||||
// Node B is the target node
|
||||
let node_b = target_node_ref.operate(|_rti, e| e.node_info(routing_domain).unwrap());
|
||||
let node_b = target_node_ref.xxx operate(|_rti, e| e.node_info(routing_domain).unwrap());
|
||||
let node_b_id = target_node_ref.node_id();
|
||||
|
||||
// Dial info filter comes from the target node ref
|
||||
@ -411,8 +434,7 @@ impl RoutingTable {
|
||||
ContactMethod::Existing => NodeContactMethod::Existing,
|
||||
ContactMethod::Direct(di) => NodeContactMethod::Direct(di),
|
||||
ContactMethod::SignalReverse(relay_key, target_key) => {
|
||||
let relay_nr = self
|
||||
.lookup_and_filter_noderef(relay_key, routing_domain.into(), dial_info_filter)
|
||||
let relay_nr = Self::lookup_and_filter_noderef_inner(inner, self.clone(), relay_key, routing_domain.into(), dial_info_filter)
|
||||
.ok_or_else(|| eyre!("couldn't look up relay"))?;
|
||||
if target_node_ref.node_id() != target_key {
|
||||
bail!("target noderef didn't match target key");
|
||||
@ -420,8 +442,7 @@ impl RoutingTable {
|
||||
NodeContactMethod::SignalReverse(relay_nr, target_node_ref)
|
||||
}
|
||||
ContactMethod::SignalHolePunch(relay_key, target_key) => {
|
||||
let relay_nr = self
|
||||
.lookup_and_filter_noderef(relay_key, routing_domain.into(), dial_info_filter)
|
||||
let relay_nr = Self::lookup_and_filter_noderef_inner(inner, self.clone(), relay_key, routing_domain.into(), dial_info_filter)
|
||||
.ok_or_else(|| eyre!("couldn't look up relay"))?;
|
||||
if target_node_ref.node_id() != target_key {
|
||||
bail!("target noderef didn't match target key");
|
||||
@ -429,14 +450,12 @@ impl RoutingTable {
|
||||
NodeContactMethod::SignalHolePunch(relay_nr, target_node_ref)
|
||||
}
|
||||
ContactMethod::InboundRelay(relay_key) => {
|
||||
let relay_nr = self
|
||||
.lookup_and_filter_noderef(relay_key, routing_domain.into(), dial_info_filter)
|
||||
let relay_nr = Self::lookup_and_filter_noderef_nner(inner, self.clone(), relay_key, routing_domain.into(), dial_info_filter)
|
||||
.ok_or_else(|| eyre!("couldn't look up relay"))?;
|
||||
NodeContactMethod::InboundRelay(relay_nr)
|
||||
}
|
||||
ContactMethod::OutboundRelay(relay_key) => {
|
||||
let relay_nr = self
|
||||
.lookup_and_filter_noderef(relay_key, routing_domain.into(), dial_info_filter)
|
||||
let relay_nr = Self::lookup_and_filter_noderef(inner, self.clone(), relay_key, routing_domain.into(), dial_info_filter)
|
||||
.ok_or_else(|| eyre!("couldn't look up relay"))?;
|
||||
NodeContactMethod::OutboundRelay(relay_nr)
|
||||
}
|
||||
@ -486,13 +505,19 @@ impl RoutingTable {
|
||||
}
|
||||
|
||||
/// Return a copy of our node's nodeinfo
|
||||
pub fn get_own_node_info(&self, routing_domain: RoutingDomain) -> NodeInfo {
|
||||
let inner = &*self.inner.read();
|
||||
fn get_own_node_info_inner(
|
||||
inner: &RoutingTableInner,
|
||||
routing_domain: RoutingDomain,
|
||||
) -> NodeInfo {
|
||||
Self::with_routing_domain(inner, routing_domain, |rdd| {
|
||||
rdd.common()
|
||||
.with_peer_info(|pi| pi.signed_node_info.node_info.clone())
|
||||
})
|
||||
}
|
||||
pub fn get_own_node_info(&self, routing_domain: RoutingDomain) -> NodeInfo {
|
||||
let inner = &*self.inner.read();
|
||||
Self::get_own_node_info_inner(inner, routing_domain)
|
||||
}
|
||||
|
||||
/// Return our currently registered network class
|
||||
pub fn has_valid_own_node_info(&self, routing_domain: RoutingDomain) -> bool {
|
||||
@ -880,6 +905,15 @@ impl RoutingTable {
|
||||
}
|
||||
|
||||
/// Resolve an existing routing table entry and return a reference to it
|
||||
fn lookup_node_ref_inner(inner: &RoutingTableInner, routing_table: RoutingTable, node_id: DHTKey) -> Option<NodeRef> {
|
||||
{
|
||||
let idx = routing_table.find_bucket_index(node_id);
|
||||
let bucket = &inner.buckets[idx];
|
||||
bucket
|
||||
.entry(&node_id)
|
||||
.map(|e| NodeRef::new(routing_table, node_id, e, None))
|
||||
}
|
||||
|
||||
pub fn lookup_node_ref(&self, node_id: DHTKey) -> Option<NodeRef> {
|
||||
if node_id == self.unlocked_inner.node_id {
|
||||
log_rtab!(debug "can't look up own node id in routing table");
|
||||
|
@ -5,6 +5,8 @@ use serde::*;
|
||||
/// Options for safety routes (sender privacy)
|
||||
#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct SafetySpec {
|
||||
/// preferred safety route if it still exists
|
||||
pub preferred_route: Option<DHTKey>,
|
||||
/// 0 = no safety route, just use node's node id, more hops is safer but slower
|
||||
pub hop_count: usize,
|
||||
/// prefer more reliable protocols and relays over faster ones
|
||||
@ -76,6 +78,10 @@ pub struct RouteSpecStoreCache {
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct RouteSpecStore {
|
||||
/// Our node id
|
||||
node_id: DHTKey,
|
||||
/// Our node id secret
|
||||
node_id_secret: DHTKeySecret,
|
||||
/// Maximum number of hops in a route
|
||||
max_route_hop_count: usize,
|
||||
/// Default number of hops in a route
|
||||
@ -83,7 +89,7 @@ pub struct RouteSpecStore {
|
||||
/// Serialize RouteSpecStore content
|
||||
content: RouteSpecStoreContent,
|
||||
/// RouteSpecStore cache
|
||||
cache: Mutex<RouteSpecStoreCache>,
|
||||
cache: RouteSpecStoreCache,
|
||||
}
|
||||
|
||||
fn route_hops_to_hop_cache(hops: &[DHTKey]) -> Vec<u8> {
|
||||
@ -170,16 +176,13 @@ where
|
||||
|
||||
impl RouteSpecStore {
|
||||
pub fn new(config: VeilidConfig) -> Self {
|
||||
let (max_route_hop_count, default_route_hop_count) = {
|
||||
let c = config.get();
|
||||
let max_route_hop_count = c.network.rpc.max_route_hop_count;
|
||||
let default_route_hop_count = c.network.rpc.max_route_hop_count;
|
||||
(max_route_hop_count.into(), default_route_hop_count.into())
|
||||
};
|
||||
let c = config.get();
|
||||
|
||||
Self {
|
||||
max_route_hop_count,
|
||||
default_route_hop_count,
|
||||
node_id: c.network.node_id,
|
||||
node_id_secret: c.network.node_id_secret,
|
||||
max_route_hop_count: c.network.rpc.max_route_hop_count.into(),
|
||||
default_route_hop_count: c.network.rpc.default_route_hop_count.into(),
|
||||
content: RouteSpecStoreContent {
|
||||
details: HashMap::new(),
|
||||
},
|
||||
@ -188,20 +191,17 @@ impl RouteSpecStore {
|
||||
}
|
||||
|
||||
pub async fn load(routing_table: RoutingTable) -> EyreResult<RouteSpecStore> {
|
||||
let (max_route_hop_count, default_route_hop_count) = {
|
||||
let c = routing_table.unlocked_inner.config.get();
|
||||
let max_route_hop_count = c.network.rpc.max_route_hop_count;
|
||||
let default_route_hop_count = c.network.rpc.max_route_hop_count;
|
||||
(max_route_hop_count.into(), default_route_hop_count.into())
|
||||
};
|
||||
|
||||
let config = routing_table.network_manager().config();
|
||||
let c = config.get();
|
||||
// Get cbor blob from table store
|
||||
let table_store = routing_table.network_manager().table_store();
|
||||
let rsstdb = table_store.open("RouteSpecStore", 1).await?;
|
||||
let content = rsstdb.load_cbor(0, b"content").await?.unwrap_or_default();
|
||||
let mut rss = RouteSpecStore {
|
||||
max_route_hop_count,
|
||||
default_route_hop_count,
|
||||
node_id: c.network.node_id,
|
||||
node_id_secret: c.network.node_id_secret,
|
||||
max_route_hop_count: c.network.rpc.max_route_hop_count.into(),
|
||||
default_route_hop_count: c.network.rpc.default_route_hop_count.into(),
|
||||
content,
|
||||
cache: Default::default(),
|
||||
};
|
||||
@ -284,17 +284,17 @@ impl RouteSpecStore {
|
||||
}
|
||||
}
|
||||
|
||||
fn detail_mut(&mut self, public_key: &DHTKey) -> &mut RouteSpecDetail {
|
||||
self.content.details.get_mut(&public_key).unwrap()
|
||||
fn detail_mut(&mut self, public_key: &DHTKey) -> Option<&mut RouteSpecDetail> {
|
||||
self.content.details.get_mut(&public_key)
|
||||
}
|
||||
|
||||
/// Create a new route
|
||||
/// Prefers nodes that are not currently in use by another route
|
||||
/// The route is not yet tested for its reachability
|
||||
/// Returns None if no route could be allocated at this time
|
||||
pub async fn allocate_route(
|
||||
pub fn allocate_route(
|
||||
&mut self,
|
||||
routing_table: RoutingTable,
|
||||
rti: &RoutingTableInner,
|
||||
reliable: bool,
|
||||
hop_count: usize,
|
||||
directions: DirectionSet,
|
||||
@ -309,10 +309,6 @@ impl RouteSpecStore {
|
||||
bail!("Not allocating route longer than max route hop count");
|
||||
}
|
||||
|
||||
// Lock routing table for reading, make sure things don't change
|
||||
// because we want to iterate the table without changes being made to it
|
||||
let rti = routing_table.inner.read();
|
||||
|
||||
// Get list of all nodes, and sort them for selection
|
||||
let cur_ts = intf::get_timestamp();
|
||||
let dial_info_sort = if reliable {
|
||||
@ -417,12 +413,20 @@ impl RouteSpecStore {
|
||||
};
|
||||
|
||||
// Pull the whole routing table in sorted order
|
||||
let node_count = routing_table.get_entry_count(
|
||||
let node_count = RoutingTable::get_entry_count_inner(
|
||||
rti,
|
||||
RoutingDomain::PublicInternet.into(),
|
||||
BucketEntryState::Unreliable,
|
||||
);
|
||||
let nodes = routing_table
|
||||
.find_peers_with_sort_and_filter(node_count, cur_ts, filter, compare, transform);
|
||||
let nodes = RoutingTable::find_peers_with_sort_and_filter_inner(
|
||||
rti,
|
||||
self.node_id,
|
||||
node_count,
|
||||
cur_ts,
|
||||
filter,
|
||||
compare,
|
||||
transform,
|
||||
);
|
||||
|
||||
// If we couldn't find enough nodes, wait until we have more nodes in the routing table
|
||||
if nodes.len() < hop_count {
|
||||
@ -447,13 +451,14 @@ impl RouteSpecStore {
|
||||
// Ensure this route is viable by checking that each node can contact the next one
|
||||
if directions.contains(Direction::Outbound) {
|
||||
let our_node_info =
|
||||
routing_table.get_own_node_info(RoutingDomain::PublicInternet);
|
||||
let our_node_id = routing_table.node_id();
|
||||
RoutingTable::get_own_node_info_inner(rti, RoutingDomain::PublicInternet);
|
||||
let our_node_id = self.node_id;
|
||||
let mut previous_node = &(our_node_id, our_node_info);
|
||||
let mut reachable = true;
|
||||
for n in permutation {
|
||||
let current_node = nodes.get(*n).unwrap();
|
||||
let cm = routing_table.get_contact_method(
|
||||
let cm = RoutingTable::get_contact_method_inner(
|
||||
rti,
|
||||
RoutingDomain::PublicInternet,
|
||||
&previous_node.0,
|
||||
&previous_node.1,
|
||||
@ -474,13 +479,14 @@ impl RouteSpecStore {
|
||||
}
|
||||
if directions.contains(Direction::Inbound) {
|
||||
let our_node_info =
|
||||
routing_table.get_own_node_info(RoutingDomain::PublicInternet);
|
||||
let our_node_id = routing_table.node_id();
|
||||
RoutingTable::get_own_node_info_inner(rti, RoutingDomain::PublicInternet);
|
||||
let our_node_id = self.node_id;
|
||||
let mut next_node = &(our_node_id, our_node_info);
|
||||
let mut reachable = true;
|
||||
for n in permutation.iter().rev() {
|
||||
let current_node = nodes.get(*n).unwrap();
|
||||
let cm = routing_table.get_contact_method(
|
||||
let cm = RoutingTable::get_contact_method_inner(
|
||||
rti,
|
||||
RoutingDomain::PublicInternet,
|
||||
&next_node.0,
|
||||
&next_node.1,
|
||||
@ -608,20 +614,53 @@ impl RouteSpecStore {
|
||||
|
||||
/// Compiles a safety route to the private route, with caching
|
||||
pub fn compile_safety_route(
|
||||
&self,
|
||||
&mut self,
|
||||
rti: &RoutingTableInner,
|
||||
safety_spec: SafetySpec,
|
||||
private_route: PrivateRoute,
|
||||
) -> Result<CompiledRoute, RPCError> {
|
||||
let pr_hopcount = private_route.hop_count as usize;
|
||||
if pr_hopcount > self.max_route_hop_count {
|
||||
return Err(RPCError::internal("private route hop count too long"));
|
||||
}
|
||||
|
||||
// See if the preferred route is here
|
||||
let opt_safety_rsd: Option<&mut RouteSpecDetail> =
|
||||
if let Some(preferred_route) = safety_spec.preferred_route {
|
||||
self.detail_mut(&preferred_route)
|
||||
} else {
|
||||
// Preferred safety route was not requested
|
||||
None
|
||||
};
|
||||
let safety_rsd: &mut RouteSpecDetail = if let Some(safety_rsd) = opt_safety_rsd {
|
||||
// Safety route exists
|
||||
safety_rsd
|
||||
} else {
|
||||
// Select a safety route from the pool or make one if we don't have one that matches
|
||||
if let Some(sr_pubkey) = self.first_unpublished_route(
|
||||
safety_spec.reliable,
|
||||
safety_spec.hop_count,
|
||||
safety_spec.hop_count,
|
||||
Direction::Outbound.into(),
|
||||
) {
|
||||
// Found a route to use
|
||||
self.detail_mut(&sr_pubkey).unwrap()
|
||||
} else {
|
||||
// No route found, gotta allocate one
|
||||
self.allocate_route(rti)
|
||||
}
|
||||
};
|
||||
|
||||
// xxx implement caching first!
|
||||
|
||||
// xxx implement, ensure we handle hops == 0 for our safetyspec
|
||||
|
||||
// Ensure the total hop count isn't too long for our config
|
||||
let pr_hopcount = private_route.hop_count as usize;
|
||||
let sr_hopcount = safety_route_spec.hops.len();
|
||||
let hopcount = 1 + sr_hopcount + pr_hopcount;
|
||||
if hopcount > self.max_route_hop_count {
|
||||
return Err(RPCError::internal("hop count too long for route"));
|
||||
let sr_hopcount = safety_spec.hop_count;
|
||||
if sr_hopcount > self.max_route_hop_count {
|
||||
return Err(RPCError::internal("private route hop count too long"));
|
||||
}
|
||||
let total_hopcount = sr_hopcount + pr_hopcount;
|
||||
|
||||
// Create hops
|
||||
let hops = if sr_hopcount == 0 {
|
||||
@ -719,41 +758,66 @@ impl RouteSpecStore {
|
||||
/// Mark route as published
|
||||
/// When first deserialized, routes must be re-published in order to ensure they remain
|
||||
/// in the RouteSpecStore.
|
||||
pub fn mark_route_published(&mut self, key: &DHTKey) {
|
||||
self.detail_mut(&key).published = true;
|
||||
pub fn mark_route_published(&mut self, key: &DHTKey) -> EyreResult<()> {
|
||||
self.detail_mut(&key)
|
||||
.ok_or_else(|| eyre!("route does not exist"))?
|
||||
.published = true;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Mark route as checked
|
||||
pub fn touch_route_checked(&mut self, key: &DHTKey, cur_ts: u64) {
|
||||
self.detail_mut(&key).last_checked_ts = Some(cur_ts);
|
||||
pub fn touch_route_checked(&mut self, key: &DHTKey, cur_ts: u64) -> EyreResult<()> {
|
||||
self.detail_mut(&key)
|
||||
.ok_or_else(|| eyre!("route does not exist"))?
|
||||
.last_checked_ts = Some(cur_ts);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Mark route as used
|
||||
pub fn touch_route_used(&mut self, key: &DHTKey, cur_ts: u64) {
|
||||
self.detail_mut(&key).last_used_ts = Some(cur_ts);
|
||||
pub fn touch_route_used(&mut self, key: &DHTKey, cur_ts: u64) -> EyreResult<()> {
|
||||
self.detail_mut(&key)
|
||||
.ok_or_else(|| eyre!("route does not exist"))?
|
||||
.last_used_ts = Some(cur_ts);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Record latency on the route
|
||||
pub fn record_latency(&mut self, key: &DHTKey, latency: u64) {
|
||||
let lsa = &mut self.detail_mut(&key).latency_stats_accounting;
|
||||
pub fn record_latency(&mut self, key: &DHTKey, latency: u64) -> EyreResult<()> {
|
||||
let lsa = &mut self
|
||||
.detail_mut(&key)
|
||||
.ok_or_else(|| eyre!("route does not exist"))?
|
||||
.latency_stats_accounting;
|
||||
self.detail_mut(&key).latency_stats = lsa.record_latency(latency);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get the calculated latency stats
|
||||
pub fn latency_stats(&self, key: &DHTKey) -> LatencyStats {
|
||||
self.detail_mut(&key).latency_stats.clone()
|
||||
pub fn latency_stats(&mut self, key: &DHTKey) -> EyreResult<LatencyStats> {
|
||||
Ok(self
|
||||
.detail_mut(&key)
|
||||
.ok_or_else(|| eyre!("route does not exist"))?
|
||||
.latency_stats
|
||||
.clone())
|
||||
}
|
||||
|
||||
/// Add download transfers to route
|
||||
pub fn add_down(&mut self, key: &DHTKey, bytes: u64) {
|
||||
let tsa = &mut self.detail_mut(&key).transfer_stats_accounting;
|
||||
pub fn add_down(&mut self, key: &DHTKey, bytes: u64) -> EyreResult<()> {
|
||||
let tsa = &mut self
|
||||
.detail_mut(&key)
|
||||
.ok_or_else(|| eyre!("route does not exist"))?
|
||||
.transfer_stats_accounting;
|
||||
tsa.add_down(bytes);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Add upload transfers to route
|
||||
pub fn add_up(&mut self, key: &DHTKey, bytes: u64) {
|
||||
let tsa = &mut self.detail_mut(&key).transfer_stats_accounting;
|
||||
pub fn add_up(&mut self, key: &DHTKey, bytes: u64) -> EyreResult<()> {
|
||||
let tsa = &mut self
|
||||
.detail_mut(&key)
|
||||
.ok_or_else(|| eyre!("route does not exist"))?
|
||||
.transfer_stats_accounting;
|
||||
tsa.add_up(bytes);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Process transfer statistics to get averages
|
||||
|
@ -8,8 +8,7 @@ pub enum Destination {
|
||||
/// The node to send to
|
||||
target: NodeRef,
|
||||
/// Require safety route or not
|
||||
xxx convert back to safety spec, bubble up to api
|
||||
safety: bool,
|
||||
safety: Option<SafetySpec>,
|
||||
},
|
||||
/// Send to node for relay purposes
|
||||
Relay {
|
||||
@ -18,14 +17,14 @@ pub enum Destination {
|
||||
/// The final destination the relay should send to
|
||||
target: DHTKey,
|
||||
/// Require safety route or not
|
||||
safety: bool,
|
||||
safety: Option<SafetySpec>,
|
||||
},
|
||||
/// Send to private route (privateroute)
|
||||
PrivateRoute {
|
||||
/// A private route to send to
|
||||
private_route: PrivateRoute,
|
||||
/// Require safety route or not
|
||||
safety: bool,
|
||||
safety: Option<SafetySpec>,
|
||||
/// Prefer reliability or not
|
||||
reliable: bool,
|
||||
},
|
||||
@ -35,29 +34,29 @@ impl Destination {
|
||||
pub fn direct(target: NodeRef) -> Self {
|
||||
Self::Direct {
|
||||
target,
|
||||
safety: false,
|
||||
safety: None,
|
||||
}
|
||||
}
|
||||
pub fn relay(relay: NodeRef, target: DHTKey) -> Self {
|
||||
Self::Relay {
|
||||
relay,
|
||||
target,
|
||||
safety: false,
|
||||
safety: None,
|
||||
}
|
||||
}
|
||||
pub fn private_route(private_route: PrivateRoute, reliable: bool) -> Self {
|
||||
Self::PrivateRoute {
|
||||
private_route,
|
||||
safety: false,
|
||||
safety: None,
|
||||
reliable,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_safety(self) -> Self {
|
||||
pub fn with_safety(self, spec: SafetySpec) -> Self {
|
||||
match self {
|
||||
Destination::Direct { target, safety: _ } => Self::Direct {
|
||||
target,
|
||||
safety: true,
|
||||
safety: Some(spec),
|
||||
},
|
||||
Destination::Relay {
|
||||
relay,
|
||||
@ -66,7 +65,7 @@ impl Destination {
|
||||
} => Self::Relay {
|
||||
relay,
|
||||
target,
|
||||
safety: true,
|
||||
safety: Some(spec),
|
||||
},
|
||||
Destination::PrivateRoute {
|
||||
private_route,
|
||||
@ -74,7 +73,7 @@ impl Destination {
|
||||
reliable,
|
||||
} => Self::PrivateRoute {
|
||||
private_route,
|
||||
safety: true,
|
||||
safety: Some(spec),
|
||||
reliable,
|
||||
},
|
||||
}
|
||||
@ -85,7 +84,7 @@ impl fmt::Display for Destination {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
match self {
|
||||
Destination::Direct { target, safety } => {
|
||||
let sr = if *safety { "+SR" } else { "" };
|
||||
let sr = if safety.is_some() { "+SR" } else { "" };
|
||||
|
||||
write!(f, "{}{}", target, sr)
|
||||
}
|
||||
@ -94,7 +93,7 @@ impl fmt::Display for Destination {
|
||||
target,
|
||||
safety,
|
||||
} => {
|
||||
let sr = if *safety { "+SR" } else { "" };
|
||||
let sr = if safety.is_some() { "+SR" } else { "" };
|
||||
|
||||
write!(f, "{}@{}{}", target.encode(), relay, sr)
|
||||
}
|
||||
@ -103,7 +102,7 @@ impl fmt::Display for Destination {
|
||||
safety,
|
||||
reliable,
|
||||
} => {
|
||||
let sr = if *safety { "+SR" } else { "" };
|
||||
let sr = if safety.is_some() { "+SR" } else { "" };
|
||||
let rl = if *reliable { "+RL" } else { "" };
|
||||
|
||||
write!(f, "{}{}{}", private_route, sr, rl)
|
||||
|
@ -409,13 +409,6 @@ impl RPCProcessor {
|
||||
rss.compile_safety_route(safety_spec, private_route)
|
||||
})?;
|
||||
|
||||
// Verify hop count isn't larger than out maximum routed hop count
|
||||
if compiled_route.safety_route.hop_count as usize > self.unlocked_inner.max_route_hop_count
|
||||
{
|
||||
return Err(RPCError::internal("hop count too long for route"))
|
||||
.map_err(logthru_rpc!(warn));
|
||||
}
|
||||
|
||||
// Encrypt routed operation
|
||||
// Xmsg + ENC(Xmsg, DH(PKapr, SKbsr))
|
||||
let nonce = Crypto::get_random_nonce();
|
||||
@ -613,17 +606,6 @@ impl RPCProcessor {
|
||||
hop_count,
|
||||
} = self.render_operation(dest, &operation)?;
|
||||
|
||||
// If we need to resolve the first hop, do it
|
||||
let node_ref = match node_ref {
|
||||
None => match self.resolve_node(node_id).await? {
|
||||
None => {
|
||||
return Ok(NetworkResult::no_connection_other(node_id));
|
||||
}
|
||||
Some(nr) => nr,
|
||||
},
|
||||
Some(nr) => nr,
|
||||
};
|
||||
|
||||
// Calculate answer timeout
|
||||
// Timeout is number of hops times the timeout per hop
|
||||
let timeout = self.unlocked_inner.timeout * (hop_count as u64);
|
||||
@ -687,17 +669,6 @@ impl RPCProcessor {
|
||||
hop_count: _,
|
||||
} = self.render_operation(dest, &operation)?;
|
||||
|
||||
// If we need to resolve the first hop, do it
|
||||
let node_ref = match node_ref {
|
||||
None => match self.resolve_node(node_id).await? {
|
||||
None => {
|
||||
return Ok(NetworkResult::no_connection_other(node_id));
|
||||
}
|
||||
Some(nr) => nr,
|
||||
},
|
||||
Some(nr) => nr,
|
||||
};
|
||||
|
||||
// Send statement
|
||||
let bytes = message.len() as u64;
|
||||
let send_ts = intf::get_timestamp();
|
||||
@ -782,17 +753,6 @@ impl RPCProcessor {
|
||||
hop_count: _,
|
||||
} = self.render_operation(dest, &operation)?;
|
||||
|
||||
// If we need to resolve the first hop, do it
|
||||
let node_ref = match node_ref {
|
||||
None => match self.resolve_node(node_id).await? {
|
||||
None => {
|
||||
return Ok(NetworkResult::no_connection_other(node_id));
|
||||
}
|
||||
Some(nr) => nr,
|
||||
},
|
||||
Some(nr) => nr,
|
||||
};
|
||||
|
||||
// Send the reply
|
||||
let bytes = message.len() as u64;
|
||||
let send_ts = intf::get_timestamp();
|
||||
|
@ -641,9 +641,9 @@ impl VeilidConfig {
|
||||
"max route hop count must be >= 1 in 'network.rpc.max_route_hop_count'"
|
||||
);
|
||||
}
|
||||
if inner.network.rpc.max_route_hop_count > 7 {
|
||||
if inner.network.rpc.max_route_hop_count > 5 {
|
||||
apibail_generic!(
|
||||
"max route hop count must be <= 7 in 'network.rpc.max_route_hop_count'"
|
||||
"max route hop count must be <= 5 in 'network.rpc.max_route_hop_count'"
|
||||
);
|
||||
}
|
||||
if inner.network.rpc.default_route_hop_count == 0 {
|
||||
|
@ -65,7 +65,7 @@ Future<VeilidConfig> getDefaultVeilidConfig() async {
|
||||
maxTimestampBehindMs: 10000,
|
||||
maxTimestampAheadMs: 10000,
|
||||
timeoutMs: 10000,
|
||||
maxRouteHopCount: 7,
|
||||
maxRouteHopCount: 4,
|
||||
defaultRouteHopCount: 2,
|
||||
),
|
||||
dht: VeilidConfigDHT(
|
||||
|
@ -45,7 +45,7 @@ fn init_callbacks() {
|
||||
case "network.rpc.max_timestamp_behind": return 10000000;
|
||||
case "network.rpc.max_timestamp_ahead": return 10000000;
|
||||
case "network.rpc.timeout": return 10000000;
|
||||
case "network.rpc.max_route_hop_count": return 7;
|
||||
case "network.rpc.max_route_hop_count": return 4;
|
||||
case "network.rpc.default_route_hop_count": return 2;
|
||||
case "network.dht.resolve_node_timeout": return null;
|
||||
case "network.dht.resolve_node_count": return 20;
|
||||
|
Loading…
Reference in New Issue
Block a user