checkpoint
This commit is contained in:
parent
4e1c3391df
commit
27f7f49d4f
@ -64,7 +64,7 @@ core:
|
|||||||
max_timestamp_ahead_ms: 10000
|
max_timestamp_ahead_ms: 10000
|
||||||
timeout_ms: 10000
|
timeout_ms: 10000
|
||||||
max_route_hop_count: 4
|
max_route_hop_count: 4
|
||||||
default_route_hop_count: 2
|
default_route_hop_count: 1
|
||||||
|
|
||||||
dht:
|
dht:
|
||||||
resolve_node_timeout:
|
resolve_node_timeout:
|
||||||
|
@ -229,7 +229,7 @@ rpc:
|
|||||||
max_timestamp_ahead_ms: 10000
|
max_timestamp_ahead_ms: 10000
|
||||||
timeout_ms: 10000
|
timeout_ms: 10000
|
||||||
max_route_hop_count: 4
|
max_route_hop_count: 4
|
||||||
default_route_hop_count: 2
|
default_route_hop_count: 1
|
||||||
```
|
```
|
||||||
|
|
||||||
#### core:network:dht
|
#### core:network:dht
|
||||||
|
@ -85,8 +85,11 @@ pub struct BucketEntryInner {
|
|||||||
/// The minimum and maximum range of cryptography versions supported by the node,
|
/// The minimum and maximum range of cryptography versions supported by the node,
|
||||||
/// inclusive of the requirements of any relay the node may be using
|
/// inclusive of the requirements of any relay the node may be using
|
||||||
min_max_version: Option<VersionRange>,
|
min_max_version: Option<VersionRange>,
|
||||||
/// Whether or not we have updated this peer with our node info since our network
|
/// If this node has updated it's SignedNodeInfo since our network
|
||||||
/// and dial info has last changed, for example when our IP address changes
|
/// and dial info has last changed, for example when our IP address changes
|
||||||
|
/// Used to determine if we should make this entry 'live' again when we receive a signednodeinfo update that
|
||||||
|
/// has the same timestamp, because if we change our own IP address or network class it may be possible for nodes that were
|
||||||
|
/// unreachable may now be reachable with the same SignedNodeInfo/DialInfo
|
||||||
updated_since_last_network_change: bool,
|
updated_since_last_network_change: bool,
|
||||||
/// The last connection descriptors used to contact this node, per protocol type
|
/// The last connection descriptors used to contact this node, per protocol type
|
||||||
#[with(Skip)]
|
#[with(Skip)]
|
||||||
|
@ -1268,35 +1268,32 @@ impl RouteSpecStore {
|
|||||||
.unwrap_or_default()
|
.unwrap_or_default()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Mark a remote private route as having seen our node info {
|
/// Mark a remote private route as having seen our node info
|
||||||
pub fn mark_remote_private_route_seen_our_node_info(&self, key: &DHTKey) {
|
pub fn mark_remote_private_route_seen_our_node_info(&self, key: &DHTKey, cur_ts: u64) {
|
||||||
let inner = &mut *self.inner.lock();
|
let inner = &mut *self.inner.lock();
|
||||||
let cur_ts = intf::get_timestamp();
|
|
||||||
Self::with_create_remote_private_route(inner, cur_ts, key, |rpr| {
|
Self::with_create_remote_private_route(inner, cur_ts, key, |rpr| {
|
||||||
rpr.seen_our_node_info = true;
|
rpr.seen_our_node_info = true;
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Mark a remote private route as having replied to a question {
|
/// Mark a remote private route as having replied to a question {
|
||||||
pub fn mark_remote_private_route_replied(&self, key: &DHTKey) {
|
pub fn mark_remote_private_route_replied(&self, key: &DHTKey, cur_ts: u64) {
|
||||||
let inner = &mut *self.inner.lock();
|
let inner = &mut *self.inner.lock();
|
||||||
let cur_ts = intf::get_timestamp();
|
|
||||||
Self::with_create_remote_private_route(inner, cur_ts, key, |rpr| {
|
Self::with_create_remote_private_route(inner, cur_ts, key, |rpr| {
|
||||||
rpr.last_replied_ts = Some(cur_ts);
|
rpr.last_replied_ts = Some(cur_ts);
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Mark a remote private route as having beed used {
|
/// Mark a remote private route as having beed used {
|
||||||
pub fn mark_remote_private_route_used(&self, key: &DHTKey) {
|
pub fn mark_remote_private_route_used(&self, key: &DHTKey, cur_ts: u64) {
|
||||||
let inner = &mut *self.inner.lock();
|
let inner = &mut *self.inner.lock();
|
||||||
let cur_ts = intf::get_timestamp();
|
|
||||||
Self::with_create_remote_private_route(inner, cur_ts, key, |rpr| {
|
Self::with_create_remote_private_route(inner, cur_ts, key, |rpr| {
|
||||||
rpr.last_used_ts = Some(cur_ts);
|
rpr.last_used_ts = Some(cur_ts);
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Clear caches when local our local node info changes
|
/// Clear caches when local our local node info changes
|
||||||
pub fn local_node_info_changed(&self) {
|
pub fn reset(&self) {
|
||||||
let inner = &mut *self.inner.lock();
|
let inner = &mut *self.inner.lock();
|
||||||
|
|
||||||
// Clean up local allocated routes
|
// Clean up local allocated routes
|
||||||
|
@ -194,14 +194,25 @@ impl RoutingDomainEditor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if changed {
|
if changed {
|
||||||
|
// Clear our 'peer info' cache, the peerinfo for this routing domain will get regenerated next time it is asked for
|
||||||
detail.common_mut().clear_cache()
|
detail.common_mut().clear_cache()
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
if changed {
|
if changed {
|
||||||
|
// Mark that nothing in the routing table has seen our new node info
|
||||||
inner.reset_all_seen_our_node_info(self.routing_domain);
|
inner.reset_all_seen_our_node_info(self.routing_domain);
|
||||||
|
//
|
||||||
inner.reset_all_updated_since_last_network_change();
|
inner.reset_all_updated_since_last_network_change();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// Clear the routespecstore cache if our PublicInternet dial info has changed
|
||||||
|
if changed {
|
||||||
|
if self.routing_domain == RoutingDomain::PublicInternet {
|
||||||
|
let rss = self.routing_table.route_spec_store();
|
||||||
|
rss.reset();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Send our updated node info to all the nodes in the routing table
|
||||||
if changed && self.send_node_info_updates {
|
if changed && self.send_node_info_updates {
|
||||||
let network_manager = self.routing_table.unlocked_inner.network_manager.clone();
|
let network_manager = self.routing_table.unlocked_inner.network_manager.clone();
|
||||||
network_manager
|
network_manager
|
||||||
|
@ -162,6 +162,7 @@ where
|
|||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
struct WaitableReply {
|
struct WaitableReply {
|
||||||
|
dest: Destination,
|
||||||
handle: OperationWaitHandle<RPCMessage>,
|
handle: OperationWaitHandle<RPCMessage>,
|
||||||
timeout: u64,
|
timeout: u64,
|
||||||
node_ref: NodeRef,
|
node_ref: NodeRef,
|
||||||
@ -441,7 +442,27 @@ impl RPCProcessor {
|
|||||||
waitable_reply.send_ts,
|
waitable_reply.send_ts,
|
||||||
recv_ts,
|
recv_ts,
|
||||||
rpcreader.header.body_len,
|
rpcreader.header.body_len,
|
||||||
)
|
);
|
||||||
|
// Process private route replies
|
||||||
|
if let Destination::PrivateRoute {
|
||||||
|
private_route,
|
||||||
|
safety_selection,
|
||||||
|
} = &waitable_reply.dest
|
||||||
|
{
|
||||||
|
let rss = self.routing_table.route_spec_store();
|
||||||
|
|
||||||
|
// If we received a reply from a private route, mark it as such
|
||||||
|
rss.mark_remote_private_route_replied(&private_route.public_key, recv_ts);
|
||||||
|
|
||||||
|
// If we sent to a private route without a safety route
|
||||||
|
// We need to mark our own node info as having been seen so we can optimize sending it
|
||||||
|
if let SafetySelection::Unsafe(_) = safety_selection {
|
||||||
|
rss.mark_remote_private_route_seen_our_node_info(
|
||||||
|
&private_route.public_key,
|
||||||
|
recv_ts,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -702,7 +723,7 @@ impl RPCProcessor {
|
|||||||
node_id,
|
node_id,
|
||||||
node_ref,
|
node_ref,
|
||||||
hop_count,
|
hop_count,
|
||||||
} = network_result_try!(self.render_operation(dest, &operation)?);
|
} = network_result_try!(self.render_operation(dest.clone(), &operation)?);
|
||||||
|
|
||||||
// Calculate answer timeout
|
// Calculate answer timeout
|
||||||
// Timeout is number of hops times the timeout per hop
|
// Timeout is number of hops times the timeout per hop
|
||||||
@ -733,8 +754,19 @@ impl RPCProcessor {
|
|||||||
// Successfully sent
|
// Successfully sent
|
||||||
node_ref.stats_question_sent(send_ts, bytes, true);
|
node_ref.stats_question_sent(send_ts, bytes, true);
|
||||||
|
|
||||||
|
// Private route stats
|
||||||
|
if let Destination::PrivateRoute {
|
||||||
|
private_route,
|
||||||
|
safety_selection: _,
|
||||||
|
} = &dest
|
||||||
|
{
|
||||||
|
let rss = self.routing_table.route_spec_store();
|
||||||
|
rss.mark_remote_private_route_used(&private_route.public_key, intf::get_timestamp());
|
||||||
|
}
|
||||||
|
|
||||||
// Pass back waitable reply completion
|
// Pass back waitable reply completion
|
||||||
Ok(NetworkResult::value(WaitableReply {
|
Ok(NetworkResult::value(WaitableReply {
|
||||||
|
dest,
|
||||||
handle,
|
handle,
|
||||||
timeout,
|
timeout,
|
||||||
node_ref,
|
node_ref,
|
||||||
|
@ -207,7 +207,7 @@ fn config_callback(key: String) -> ConfigCallbackReturn {
|
|||||||
"network.rpc.max_timestamp_ahead_ms" => Ok(Box::new(Some(10_000u32))),
|
"network.rpc.max_timestamp_ahead_ms" => Ok(Box::new(Some(10_000u32))),
|
||||||
"network.rpc.timeout_ms" => Ok(Box::new(10_000u32)),
|
"network.rpc.timeout_ms" => Ok(Box::new(10_000u32)),
|
||||||
"network.rpc.max_route_hop_count" => Ok(Box::new(4u8)),
|
"network.rpc.max_route_hop_count" => Ok(Box::new(4u8)),
|
||||||
"network.rpc.default_route_hop_count" => Ok(Box::new(2u8)),
|
"network.rpc.default_route_hop_count" => Ok(Box::new(1u8)),
|
||||||
"network.dht.resolve_node_timeout_ms" => Ok(Box::new(Option::<u32>::None)),
|
"network.dht.resolve_node_timeout_ms" => Ok(Box::new(Option::<u32>::None)),
|
||||||
"network.dht.resolve_node_count" => Ok(Box::new(20u32)),
|
"network.dht.resolve_node_count" => Ok(Box::new(20u32)),
|
||||||
"network.dht.resolve_node_fanout" => Ok(Box::new(3u32)),
|
"network.dht.resolve_node_fanout" => Ok(Box::new(3u32)),
|
||||||
@ -325,7 +325,7 @@ pub async fn test_config() {
|
|||||||
assert_eq!(inner.network.rpc.queue_size, 1024u32);
|
assert_eq!(inner.network.rpc.queue_size, 1024u32);
|
||||||
assert_eq!(inner.network.rpc.timeout_ms, 10_000u32);
|
assert_eq!(inner.network.rpc.timeout_ms, 10_000u32);
|
||||||
assert_eq!(inner.network.rpc.max_route_hop_count, 4u8);
|
assert_eq!(inner.network.rpc.max_route_hop_count, 4u8);
|
||||||
assert_eq!(inner.network.rpc.default_route_hop_count, 2u8);
|
assert_eq!(inner.network.rpc.default_route_hop_count, 1u8);
|
||||||
assert_eq!(inner.network.routing_table.limit_over_attached, 64u32);
|
assert_eq!(inner.network.routing_table.limit_over_attached, 64u32);
|
||||||
assert_eq!(inner.network.routing_table.limit_fully_attached, 32u32);
|
assert_eq!(inner.network.routing_table.limit_fully_attached, 32u32);
|
||||||
assert_eq!(inner.network.routing_table.limit_attached_strong, 16u32);
|
assert_eq!(inner.network.routing_table.limit_attached_strong, 16u32);
|
||||||
|
@ -83,7 +83,8 @@ impl<E: Send + 'static> TickTask<E> {
|
|||||||
let now = intf::get_timestamp();
|
let now = intf::get_timestamp();
|
||||||
let last_timestamp_us = self.last_timestamp_us.load(Ordering::Acquire);
|
let last_timestamp_us = self.last_timestamp_us.load(Ordering::Acquire);
|
||||||
|
|
||||||
if last_timestamp_us != 0u64 && (now - last_timestamp_us) < self.tick_period_us {
|
if last_timestamp_us != 0u64 && now.saturating_sub(last_timestamp_us) < self.tick_period_us
|
||||||
|
{
|
||||||
// It's not time yet
|
// It's not time yet
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
@ -66,7 +66,7 @@ Future<VeilidConfig> getDefaultVeilidConfig() async {
|
|||||||
maxTimestampAheadMs: 10000,
|
maxTimestampAheadMs: 10000,
|
||||||
timeoutMs: 10000,
|
timeoutMs: 10000,
|
||||||
maxRouteHopCount: 4,
|
maxRouteHopCount: 4,
|
||||||
defaultRouteHopCount: 2,
|
defaultRouteHopCount: 1,
|
||||||
),
|
),
|
||||||
dht: VeilidConfigDHT(
|
dht: VeilidConfigDHT(
|
||||||
resolveNodeTimeoutMs: null,
|
resolveNodeTimeoutMs: null,
|
||||||
|
@ -84,7 +84,7 @@ core:
|
|||||||
max_timestamp_ahead_ms: 10000
|
max_timestamp_ahead_ms: 10000
|
||||||
timeout_ms: 10000
|
timeout_ms: 10000
|
||||||
max_route_hop_count: 4
|
max_route_hop_count: 4
|
||||||
default_route_hop_count: 2
|
default_route_hop_count: 1
|
||||||
dht:
|
dht:
|
||||||
resolve_node_timeout:
|
resolve_node_timeout:
|
||||||
resolve_node_count: 20
|
resolve_node_count: 20
|
||||||
@ -1510,7 +1510,7 @@ mod tests {
|
|||||||
assert_eq!(s.core.network.rpc.max_timestamp_ahead_ms, Some(10_000u32));
|
assert_eq!(s.core.network.rpc.max_timestamp_ahead_ms, Some(10_000u32));
|
||||||
assert_eq!(s.core.network.rpc.timeout_ms, 10_000u32);
|
assert_eq!(s.core.network.rpc.timeout_ms, 10_000u32);
|
||||||
assert_eq!(s.core.network.rpc.max_route_hop_count, 4);
|
assert_eq!(s.core.network.rpc.max_route_hop_count, 4);
|
||||||
assert_eq!(s.core.network.rpc.default_route_hop_count, 2);
|
assert_eq!(s.core.network.rpc.default_route_hop_count, 1);
|
||||||
//
|
//
|
||||||
assert_eq!(s.core.network.dht.resolve_node_timeout_ms, None);
|
assert_eq!(s.core.network.dht.resolve_node_timeout_ms, None);
|
||||||
assert_eq!(s.core.network.dht.resolve_node_count, 20u32);
|
assert_eq!(s.core.network.dht.resolve_node_count, 20u32);
|
||||||
|
@ -46,7 +46,7 @@ fn init_callbacks() {
|
|||||||
case "network.rpc.max_timestamp_ahead": return 10000000;
|
case "network.rpc.max_timestamp_ahead": return 10000000;
|
||||||
case "network.rpc.timeout": return 10000000;
|
case "network.rpc.timeout": return 10000000;
|
||||||
case "network.rpc.max_route_hop_count": return 4;
|
case "network.rpc.max_route_hop_count": return 4;
|
||||||
case "network.rpc.default_route_hop_count": return 2;
|
case "network.rpc.default_route_hop_count": return 1;
|
||||||
case "network.dht.resolve_node_timeout": return null;
|
case "network.dht.resolve_node_timeout": return null;
|
||||||
case "network.dht.resolve_node_count": return 20;
|
case "network.dht.resolve_node_count": return 20;
|
||||||
case "network.dht.resolve_node_fanout": return 3;
|
case "network.dht.resolve_node_fanout": return 3;
|
||||||
|
Loading…
Reference in New Issue
Block a user