veilid/veilid-core/src/routing_table/node_ref.rs

596 lines
19 KiB
Rust
Raw Normal View History

2021-11-22 16:28:30 +00:00
use super::*;
2022-10-30 23:29:31 +00:00
use crate::crypto::*;
2021-11-22 16:28:30 +00:00
use alloc::fmt;
2022-11-04 02:02:40 +00:00
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
2022-09-03 17:57:25 +00:00
2022-11-04 02:02:40 +00:00
pub struct NodeRefBaseCommon {
2021-11-22 16:28:30 +00:00
routing_table: RoutingTable,
2022-06-25 14:57:33 +00:00
entry: Arc<BucketEntry>,
2022-09-03 17:57:25 +00:00
filter: Option<NodeRefFilter>,
2022-10-21 14:35:03 +00:00
sequencing: Sequencing,
2022-05-25 15:12:19 +00:00
#[cfg(feature = "tracking")]
track_id: usize,
2021-11-22 16:28:30 +00:00
}
2022-11-04 02:02:40 +00:00
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
2022-05-25 15:12:19 +00:00
2022-11-04 02:02:40 +00:00
pub trait NodeRefBase: Sized {
// Common field access
fn common(&self) -> &NodeRefBaseCommon;
fn common_mut(&mut self) -> &mut NodeRefBaseCommon;
2021-11-22 16:28:30 +00:00
2023-02-15 23:18:08 +00:00
// Comparators
fn same_entry<T: NodeRefBase>(&self, other: &T) -> bool {
Arc::ptr_eq(&self.common().entry, &other.common().entry)
}
fn same_bucket_entry(&self, entry: &Arc<BucketEntry>) -> bool {
Arc::ptr_eq(&self.common().entry, entry)
}
2022-11-04 02:02:40 +00:00
// Implementation-specific operators
fn operate<T, F>(&self, f: F) -> T
2022-09-03 17:57:25 +00:00
where
2022-11-04 02:02:40 +00:00
F: FnOnce(&RoutingTableInner, &BucketEntryInner) -> T;
fn operate_mut<T, F>(&self, f: F) -> T
2022-09-03 17:57:25 +00:00
where
2022-11-04 02:02:40 +00:00
F: FnOnce(&mut RoutingTableInner, &mut BucketEntryInner) -> T;
2022-09-03 17:57:25 +00:00
// Filtering
2022-11-04 02:02:40 +00:00
fn filter_ref(&self) -> Option<&NodeRefFilter> {
self.common().filter.as_ref()
2022-04-21 00:49:16 +00:00
}
2022-11-04 02:02:40 +00:00
fn take_filter(&mut self) -> Option<NodeRefFilter> {
self.common_mut().filter.take()
2022-04-21 00:49:16 +00:00
}
2022-11-04 02:02:40 +00:00
fn set_filter(&mut self, filter: Option<NodeRefFilter>) {
self.common_mut().filter = filter
2022-04-21 00:49:16 +00:00
}
2022-11-04 02:02:40 +00:00
fn set_sequencing(&mut self, sequencing: Sequencing) {
self.common_mut().sequencing = sequencing;
}
2022-11-04 02:02:40 +00:00
fn sequencing(&self) -> Sequencing {
self.common().sequencing
2022-10-13 02:53:40 +00:00
}
2022-11-04 02:02:40 +00:00
fn merge_filter(&mut self, filter: NodeRefFilter) {
let common_mut = self.common_mut();
if let Some(self_filter) = common_mut.filter.take() {
common_mut.filter = Some(self_filter.filtered(&filter));
2022-08-02 01:06:31 +00:00
} else {
2022-11-04 02:02:40 +00:00
common_mut.filter = Some(filter);
2022-08-02 01:06:31 +00:00
}
}
2022-11-04 02:02:40 +00:00
fn is_filter_dead(&self) -> bool {
if let Some(filter) = &self.common().filter {
2022-08-02 01:06:31 +00:00
filter.is_dead()
} else {
false
2022-04-21 00:49:16 +00:00
}
}
2022-11-04 02:02:40 +00:00
fn routing_domain_set(&self) -> RoutingDomainSet {
self.common()
.filter
2022-09-04 19:40:35 +00:00
.as_ref()
2022-09-03 17:57:25 +00:00
.map(|f| f.routing_domain_set)
.unwrap_or(RoutingDomainSet::all())
2022-06-25 14:57:33 +00:00
}
2022-11-04 02:02:40 +00:00
fn dial_info_filter(&self) -> DialInfoFilter {
self.common()
.filter
2022-09-04 19:40:35 +00:00
.as_ref()
.map(|f| f.dial_info_filter.clone())
2022-09-04 18:17:28 +00:00
.unwrap_or(DialInfoFilter::all())
}
2022-11-04 02:02:40 +00:00
fn best_routing_domain(&self) -> Option<RoutingDomain> {
2022-12-08 17:48:01 +00:00
self.operate(|rti, e| {
2022-09-03 17:57:25 +00:00
e.best_routing_domain(
2022-12-08 17:48:01 +00:00
rti,
2022-11-04 02:02:40 +00:00
self.common()
.filter
2022-09-04 19:40:35 +00:00
.as_ref()
2022-09-03 17:57:25 +00:00
.map(|f| f.routing_domain_set)
.unwrap_or(RoutingDomainSet::all()),
)
})
2021-11-22 16:28:30 +00:00
}
2022-09-03 17:57:25 +00:00
// Accessors
2022-11-04 02:02:40 +00:00
fn routing_table(&self) -> RoutingTable {
self.common().routing_table.clone()
2022-04-16 15:18:54 +00:00
}
2023-02-11 20:54:55 +00:00
fn node_ids(&self) -> TypedKeySet {
2023-02-10 02:01:04 +00:00
self.operate(|_rti, e| e.node_ids())
2022-03-25 02:07:55 +00:00
}
2023-02-12 04:16:32 +00:00
fn best_node_id(&self) -> TypedKey {
self.operate(|_rti, e| e.best_node_id())
}
2022-11-04 02:02:40 +00:00
fn has_updated_since_last_network_change(&self) -> bool {
2022-08-31 01:21:16 +00:00
self.operate(|_rti, e| e.has_updated_since_last_network_change())
2022-08-27 02:52:08 +00:00
}
2022-11-04 02:02:40 +00:00
fn set_updated_since_last_network_change(&self) {
2022-08-31 01:21:16 +00:00
self.operate_mut(|_rti, e| e.set_updated_since_last_network_change(true));
2022-08-27 02:52:08 +00:00
}
2022-11-04 02:02:40 +00:00
fn update_node_status(&self, node_status: NodeStatus) {
2022-08-31 01:21:16 +00:00
self.operate_mut(|_rti, e| {
e.update_node_status(node_status);
});
2022-04-21 00:49:16 +00:00
}
2023-02-11 20:54:55 +00:00
fn envelope_support(&self) -> Vec<u8> {
self.operate(|_rti, e| e.envelope_support())
2022-04-21 00:49:16 +00:00
}
2023-02-11 20:54:55 +00:00
fn add_envelope_version(&self, envelope_version: u8) {
self.operate_mut(|_rti, e| e.add_envelope_version(envelope_version))
}
fn set_envelope_support(&self, envelope_support: Vec<u8>) {
self.operate_mut(|_rti, e| e.set_envelope_support(envelope_support))
2022-08-31 01:21:16 +00:00
}
2023-02-17 22:47:21 +00:00
fn best_envelope_version(&self) -> Option<u8> {
self.operate(|_rti, e| e.best_envelope_version())
}
2022-12-17 01:07:28 +00:00
fn state(&self, cur_ts: Timestamp) -> BucketEntryState {
2022-08-31 01:21:16 +00:00
self.operate(|_rti, e| e.state(cur_ts))
}
2022-11-04 02:02:40 +00:00
fn peer_stats(&self) -> PeerStats {
2022-09-06 20:49:43 +00:00
self.operate(|_rti, e| e.peer_stats().clone())
}
2022-08-31 01:21:16 +00:00
2022-09-03 17:57:25 +00:00
// Per-RoutingDomain accessors
2022-11-04 02:02:40 +00:00
fn make_peer_info(&self, routing_domain: RoutingDomain) -> Option<PeerInfo> {
2023-02-10 02:01:04 +00:00
self.operate(|_rti, e| e.make_peer_info(routing_domain))
2022-09-04 18:17:28 +00:00
}
2022-11-04 02:02:40 +00:00
fn node_info(&self, routing_domain: RoutingDomain) -> Option<NodeInfo> {
2022-10-19 01:53:45 +00:00
self.operate(|_rti, e| e.node_info(routing_domain).cloned())
}
2022-11-04 02:02:40 +00:00
fn signed_node_info_has_valid_signature(&self, routing_domain: RoutingDomain) -> bool {
2022-09-04 18:17:28 +00:00
self.operate(|_rti, e| {
e.signed_node_info(routing_domain)
2023-02-08 21:50:07 +00:00
.map(|sni| sni.has_any_signature())
2022-09-04 18:17:28 +00:00
.unwrap_or(false)
})
2022-09-03 17:57:25 +00:00
}
2022-12-17 01:07:28 +00:00
fn node_info_ts(&self, routing_domain: RoutingDomain) -> Timestamp {
2022-12-08 15:24:33 +00:00
self.operate(|_rti, e| {
e.signed_node_info(routing_domain)
.map(|sni| sni.timestamp())
2022-12-17 01:07:28 +00:00
.unwrap_or(0u64.into())
2022-12-08 15:24:33 +00:00
})
}
fn has_seen_our_node_info_ts(
&self,
routing_domain: RoutingDomain,
2022-12-17 01:07:28 +00:00
our_node_info_ts: Timestamp,
2022-12-08 15:24:33 +00:00
) -> bool {
self.operate(|_rti, e| e.has_seen_our_node_info_ts(routing_domain, our_node_info_ts))
2022-09-03 17:57:25 +00:00
}
2022-12-17 01:07:28 +00:00
fn set_our_node_info_ts(&self, routing_domain: RoutingDomain, seen_ts: Timestamp) {
2022-12-08 15:24:33 +00:00
self.operate_mut(|_rti, e| e.set_our_node_info_ts(routing_domain, seen_ts));
2022-09-03 17:57:25 +00:00
}
2022-11-04 02:02:40 +00:00
fn network_class(&self, routing_domain: RoutingDomain) -> Option<NetworkClass> {
2022-08-31 01:21:16 +00:00
self.operate(|_rt, e| e.node_info(routing_domain).map(|n| n.network_class))
2022-08-02 01:06:31 +00:00
}
2022-11-04 02:02:40 +00:00
fn outbound_protocols(&self, routing_domain: RoutingDomain) -> Option<ProtocolTypeSet> {
2022-08-31 01:21:16 +00:00
self.operate(|_rt, e| e.node_info(routing_domain).map(|n| n.outbound_protocols))
}
2022-11-04 02:02:40 +00:00
fn address_types(&self, routing_domain: RoutingDomain) -> Option<AddressTypeSet> {
2022-08-31 01:21:16 +00:00
self.operate(|_rt, e| e.node_info(routing_domain).map(|n| n.address_types))
}
2022-11-04 02:02:40 +00:00
fn node_info_outbound_filter(&self, routing_domain: RoutingDomain) -> DialInfoFilter {
2022-08-02 01:06:31 +00:00
let mut dif = DialInfoFilter::all();
2022-08-31 01:21:16 +00:00
if let Some(outbound_protocols) = self.outbound_protocols(routing_domain) {
2022-08-02 01:06:31 +00:00
dif = dif.with_protocol_type_set(outbound_protocols);
}
2022-08-31 01:21:16 +00:00
if let Some(address_types) = self.address_types(routing_domain) {
2022-08-02 01:06:31 +00:00
dif = dif.with_address_type_set(address_types);
}
dif
}
2022-11-04 02:02:40 +00:00
fn relay(&self, routing_domain: RoutingDomain) -> Option<NodeRef> {
self.operate_mut(|rti, e| {
2022-11-10 03:27:37 +00:00
e.signed_node_info(routing_domain)
.and_then(|n| n.relay_peer_info())
2023-02-10 02:01:04 +00:00
.and_then(|rpi| {
2022-11-10 03:27:37 +00:00
// If relay is ourselves, then return None, because we can't relay through ourselves
// and to contact this node we should have had an existing inbound connection
2023-02-10 02:01:04 +00:00
if rti.unlocked_inner.matches_own_node_id(&rpi.node_ids) {
2022-11-10 03:27:37 +00:00
return None;
}
2022-11-10 03:27:37 +00:00
// Register relay node and return noderef
2023-02-10 02:01:04 +00:00
rti.register_node_with_peer_info(
2022-11-10 03:27:37 +00:00
self.routing_table(),
routing_domain,
2023-02-10 02:01:04 +00:00
rpi,
2022-11-10 03:27:37 +00:00
false,
)
})
2022-05-11 01:49:42 +00:00
})
2022-04-21 00:49:16 +00:00
}
2022-09-03 17:57:25 +00:00
// Filtered accessors
2022-11-04 02:02:40 +00:00
fn first_filtered_dial_info_detail(&self) -> Option<DialInfoDetail> {
2022-09-03 17:57:25 +00:00
let routing_domain_set = self.routing_domain_set();
2022-09-04 18:17:28 +00:00
let dial_info_filter = self.dial_info_filter();
2022-11-04 02:02:40 +00:00
let (sort, dial_info_filter) = match self.common().sequencing {
2022-10-21 14:35:03 +00:00
Sequencing::NoPreference => (None, dial_info_filter),
Sequencing::PreferOrdered => (
Some(DialInfoDetail::ordered_sequencing_sort),
dial_info_filter,
),
Sequencing::EnsureOrdered => (
Some(DialInfoDetail::ordered_sequencing_sort),
dial_info_filter.filtered(
&DialInfoFilter::all().with_protocol_type_set(ProtocolType::all_ordered_set()),
),
),
};
2022-08-31 01:21:16 +00:00
self.operate(|_rt, e| {
2022-09-03 17:57:25 +00:00
for routing_domain in routing_domain_set {
if let Some(ni) = e.node_info(routing_domain) {
2022-09-04 18:17:28 +00:00
let filter = |did: &DialInfoDetail| did.matches_filter(&dial_info_filter);
if let Some(did) = ni.first_filtered_dial_info_detail(sort, filter) {
2022-09-03 17:57:25 +00:00
return Some(did);
}
2022-04-19 15:23:44 +00:00
}
2022-09-03 17:57:25 +00:00
}
None
2022-04-19 15:23:44 +00:00
})
}
2022-11-04 02:02:40 +00:00
fn all_filtered_dial_info_details<F>(&self) -> Vec<DialInfoDetail> {
2022-09-03 17:57:25 +00:00
let routing_domain_set = self.routing_domain_set();
2022-09-04 18:17:28 +00:00
let dial_info_filter = self.dial_info_filter();
2022-11-04 02:02:40 +00:00
let (sort, dial_info_filter) = match self.common().sequencing {
2022-10-21 14:35:03 +00:00
Sequencing::NoPreference => (None, dial_info_filter),
Sequencing::PreferOrdered => (
Some(DialInfoDetail::ordered_sequencing_sort),
dial_info_filter,
),
Sequencing::EnsureOrdered => (
Some(DialInfoDetail::ordered_sequencing_sort),
dial_info_filter.filtered(
&DialInfoFilter::all().with_protocol_type_set(ProtocolType::all_ordered_set()),
),
),
};
2022-04-19 15:23:44 +00:00
let mut out = Vec::new();
2022-08-31 01:21:16 +00:00
self.operate(|_rt, e| {
2022-09-03 17:57:25 +00:00
for routing_domain in routing_domain_set {
if let Some(ni) = e.node_info(routing_domain) {
2022-09-04 18:17:28 +00:00
let filter = |did: &DialInfoDetail| did.matches_filter(&dial_info_filter);
if let Some(did) = ni.first_filtered_dial_info_detail(sort, filter) {
2022-09-03 17:57:25 +00:00
out.push(did);
}
2022-05-11 01:49:42 +00:00
}
2022-04-19 15:23:44 +00:00
}
});
out.remove_duplicates();
2022-04-19 15:23:44 +00:00
out
}
2022-11-04 02:02:40 +00:00
fn last_connection(&self) -> Option<ConnectionDescriptor> {
// Get the last connections and the last time we saw anything with this connection
// Filtered first and then sorted by most recent
2022-11-04 02:02:40 +00:00
self.operate(|rti, e| {
2022-12-09 01:30:42 +00:00
let last_connections = e.last_connections(rti, true, self.common().filter.clone());
last_connections.first().map(|x| x.0)
2022-11-04 02:02:40 +00:00
})
2021-11-22 16:28:30 +00:00
}
2022-04-17 23:10:10 +00:00
2022-11-04 02:02:40 +00:00
fn clear_last_connections(&self) {
2022-08-31 01:21:16 +00:00
self.operate_mut(|_rti, e| e.clear_last_connections())
2022-08-06 14:23:26 +00:00
}
2022-12-17 01:07:28 +00:00
fn set_last_connection(&self, connection_descriptor: ConnectionDescriptor, ts: Timestamp) {
2022-11-04 02:02:40 +00:00
self.operate_mut(|rti, e| {
e.set_last_connection(connection_descriptor, ts);
2023-02-12 04:16:32 +00:00
rti.touch_recent_peer(e.best_node_id(), connection_descriptor);
2022-11-04 02:02:40 +00:00
})
2022-08-06 16:36:07 +00:00
}
2022-11-04 02:02:40 +00:00
fn has_any_dial_info(&self) -> bool {
2022-08-31 01:21:16 +00:00
self.operate(|_rti, e| {
for rtd in RoutingDomain::all() {
2022-11-10 03:27:37 +00:00
if let Some(sni) = e.signed_node_info(rtd) {
if sni.has_any_dial_info() {
2022-08-31 01:21:16 +00:00
return true;
}
}
}
false
})
}
2022-12-17 01:07:28 +00:00
fn stats_question_sent(&self, ts: Timestamp, bytes: Timestamp, expects_answer: bool) {
2022-08-31 01:21:16 +00:00
self.operate_mut(|rti, e| {
2022-10-19 01:53:45 +00:00
rti.transfer_stats_accounting().add_up(bytes);
2022-08-31 01:21:16 +00:00
e.question_sent(ts, bytes, expects_answer);
})
}
2022-12-17 01:07:28 +00:00
fn stats_question_rcvd(&self, ts: Timestamp, bytes: ByteCount) {
2022-08-31 01:21:16 +00:00
self.operate_mut(|rti, e| {
2022-10-19 01:53:45 +00:00
rti.transfer_stats_accounting().add_down(bytes);
2022-08-31 01:21:16 +00:00
e.question_rcvd(ts, bytes);
})
}
2022-12-17 01:07:28 +00:00
fn stats_answer_sent(&self, bytes: ByteCount) {
2022-08-31 01:21:16 +00:00
self.operate_mut(|rti, e| {
2022-10-19 01:53:45 +00:00
rti.transfer_stats_accounting().add_up(bytes);
2022-08-31 01:21:16 +00:00
e.answer_sent(bytes);
})
}
2022-12-17 01:07:28 +00:00
fn stats_answer_rcvd(&self, send_ts: Timestamp, recv_ts: Timestamp, bytes: ByteCount) {
2022-08-31 01:21:16 +00:00
self.operate_mut(|rti, e| {
2022-10-19 01:53:45 +00:00
rti.transfer_stats_accounting().add_down(bytes);
rti.latency_stats_accounting()
2022-12-10 18:36:26 +00:00
.record_latency(recv_ts.saturating_sub(send_ts));
2022-08-31 01:21:16 +00:00
e.answer_rcvd(send_ts, recv_ts, bytes);
})
}
2022-11-04 02:02:40 +00:00
fn stats_question_lost(&self) {
2022-08-31 01:21:16 +00:00
self.operate_mut(|_rti, e| {
e.question_lost();
})
}
2022-12-17 01:07:28 +00:00
fn stats_failed_to_send(&self, ts: Timestamp, expects_answer: bool) {
2022-08-31 01:21:16 +00:00
self.operate_mut(|_rti, e| {
e.failed_to_send(ts, expects_answer);
2022-05-11 01:49:42 +00:00
})
2022-04-17 17:28:39 +00:00
}
2021-11-22 16:28:30 +00:00
}
2022-11-04 02:02:40 +00:00
////////////////////////////////////////////////////////////////////////////////////
/// Reference to a routing table entry
/// Keeps entry in the routing table until all references are gone
pub struct NodeRef {
common: NodeRefBaseCommon,
}
impl NodeRef {
pub fn new(
routing_table: RoutingTable,
entry: Arc<BucketEntry>,
filter: Option<NodeRefFilter>,
) -> Self {
entry.ref_count.fetch_add(1u32, Ordering::Relaxed);
2022-05-25 15:12:19 +00:00
2022-06-25 14:57:33 +00:00
Self {
2022-11-04 02:02:40 +00:00
common: NodeRefBaseCommon {
routing_table,
entry,
filter,
sequencing: Sequencing::NoPreference,
#[cfg(feature = "tracking")]
track_id: entry.track(),
},
2022-06-25 14:57:33 +00:00
}
2021-11-22 16:28:30 +00:00
}
2022-11-04 02:02:40 +00:00
pub fn filtered_clone(&self, filter: NodeRefFilter) -> Self {
let mut out = self.clone();
out.merge_filter(filter);
out
}
2022-11-09 22:11:35 +00:00
pub fn locked<'a>(&self, rti: &'a RoutingTableInner) -> NodeRefLocked<'a> {
2022-11-04 02:02:40 +00:00
NodeRefLocked::new(rti, self.clone())
}
2022-11-09 22:11:35 +00:00
pub fn locked_mut<'a>(&self, rti: &'a mut RoutingTableInner) -> NodeRefLockedMut<'a> {
NodeRefLockedMut::new(rti, self.clone())
}
2021-11-22 16:28:30 +00:00
}
2022-11-04 02:02:40 +00:00
impl NodeRefBase for NodeRef {
fn common(&self) -> &NodeRefBaseCommon {
&self.common
}
fn common_mut(&mut self) -> &mut NodeRefBaseCommon {
&mut self.common
}
2022-04-17 17:28:39 +00:00
2022-11-04 02:02:40 +00:00
fn operate<T, F>(&self, f: F) -> T
where
F: FnOnce(&RoutingTableInner, &BucketEntryInner) -> T,
{
let inner = &*self.common.routing_table.inner.read();
self.common.entry.with(inner, f)
}
fn operate_mut<T, F>(&self, f: F) -> T
where
F: FnOnce(&mut RoutingTableInner, &mut BucketEntryInner) -> T,
{
let inner = &mut *self.common.routing_table.inner.write();
self.common.entry.with_mut(inner, f)
}
}
impl Clone for NodeRef {
fn clone(&self) -> Self {
self.common
.entry
.ref_count
.fetch_add(1u32, Ordering::Relaxed);
Self {
common: NodeRefBaseCommon {
routing_table: self.common.routing_table.clone(),
entry: self.common.entry.clone(),
filter: self.common.filter.clone(),
sequencing: self.common.sequencing,
#[cfg(feature = "tracking")]
track_id: self.common.entry.write().track(),
},
}
}
}
2022-04-17 17:28:39 +00:00
2022-05-25 15:12:19 +00:00
impl fmt::Display for NodeRef {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2023-02-12 04:16:32 +00:00
write!(f, "{}", self.common.entry.with_inner(|e| e.best_node_id()))
2022-05-25 15:12:19 +00:00
}
}
2021-11-22 16:28:30 +00:00
impl fmt::Debug for NodeRef {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2022-05-25 15:12:19 +00:00
f.debug_struct("NodeRef")
2023-02-12 04:16:32 +00:00
.field("node_ids", &self.common.entry.with_inner(|e| e.node_ids()))
2022-11-04 02:02:40 +00:00
.field("filter", &self.common.filter)
.field("sequencing", &self.common.sequencing)
2022-05-25 15:12:19 +00:00
.finish()
2021-11-22 16:28:30 +00:00
}
}
impl Drop for NodeRef {
fn drop(&mut self) {
2022-05-25 15:12:19 +00:00
#[cfg(feature = "tracking")]
2022-11-04 02:02:40 +00:00
self.common.entry.write().untrack(self.track_id);
2022-06-25 14:57:33 +00:00
// drop the noderef and queue a bucket kick if it was the last one
2022-11-04 02:02:40 +00:00
let new_ref_count = self
.common
.entry
.ref_count
.fetch_sub(1u32, Ordering::Relaxed)
- 1;
2022-06-25 14:57:33 +00:00
if new_ref_count == 0 {
2023-02-12 04:16:32 +00:00
// get node ids with inner unlocked because nothing could be referencing this entry now
// and we don't know when it will get dropped, possibly inside a lock
let node_ids = self.common().entry.with_inner(|e| e.node_ids());
self.common.routing_table.queue_bucket_kicks(node_ids);
2022-11-04 02:02:40 +00:00
}
}
}
////////////////////////////////////////////////////////////////////////////////////
/// Locked reference to a routing table entry
/// For internal use inside the RoutingTable module where you have
/// already locked a RoutingTableInner
/// Keeps entry in the routing table until all references are gone
pub struct NodeRefLocked<'a> {
2022-11-09 22:11:35 +00:00
inner: Mutex<&'a RoutingTableInner>,
2022-11-04 02:02:40 +00:00
nr: NodeRef,
}
impl<'a> NodeRefLocked<'a> {
2022-11-09 22:11:35 +00:00
pub fn new(inner: &'a RoutingTableInner, nr: NodeRef) -> Self {
2022-11-04 02:02:40 +00:00
Self {
inner: Mutex::new(inner),
nr,
2022-06-25 14:57:33 +00:00
}
2021-11-22 16:28:30 +00:00
}
}
2022-11-04 02:02:40 +00:00
impl<'a> NodeRefBase for NodeRefLocked<'a> {
fn common(&self) -> &NodeRefBaseCommon {
&self.nr.common
}
fn common_mut(&mut self) -> &mut NodeRefBaseCommon {
&mut self.nr.common
}
fn operate<T, F>(&self, f: F) -> T
where
F: FnOnce(&RoutingTableInner, &BucketEntryInner) -> T,
{
let inner = &*self.inner.lock();
self.nr.common.entry.with(inner, f)
}
2022-11-09 22:11:35 +00:00
fn operate_mut<T, F>(&self, _f: F) -> T
2022-11-04 02:02:40 +00:00
where
F: FnOnce(&mut RoutingTableInner, &mut BucketEntryInner) -> T,
{
2022-11-09 22:11:35 +00:00
panic!("need to locked_mut() for this operation")
2022-11-04 02:02:40 +00:00
}
}
impl<'a> fmt::Display for NodeRefLocked<'a> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.nr)
}
}
impl<'a> fmt::Debug for NodeRefLocked<'a> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("NodeRefLocked")
.field("nr", &self.nr)
.finish()
}
}
2022-11-09 22:11:35 +00:00
////////////////////////////////////////////////////////////////////////////////////
/// Mutable locked reference to a routing table entry
/// For internal use inside the RoutingTable module where you have
/// already locked a RoutingTableInner
/// Keeps entry in the routing table until all references are gone
pub struct NodeRefLockedMut<'a> {
inner: Mutex<&'a mut RoutingTableInner>,
nr: NodeRef,
}
impl<'a> NodeRefLockedMut<'a> {
pub fn new(inner: &'a mut RoutingTableInner, nr: NodeRef) -> Self {
Self {
inner: Mutex::new(inner),
nr,
}
}
}
impl<'a> NodeRefBase for NodeRefLockedMut<'a> {
fn common(&self) -> &NodeRefBaseCommon {
&self.nr.common
}
fn common_mut(&mut self) -> &mut NodeRefBaseCommon {
&mut self.nr.common
}
fn operate<T, F>(&self, f: F) -> T
where
F: FnOnce(&RoutingTableInner, &BucketEntryInner) -> T,
{
let inner = &*self.inner.lock();
self.nr.common.entry.with(inner, f)
}
fn operate_mut<T, F>(&self, f: F) -> T
where
F: FnOnce(&mut RoutingTableInner, &mut BucketEntryInner) -> T,
{
let inner = &mut *self.inner.lock();
self.nr.common.entry.with_mut(inner, f)
}
}
impl<'a> fmt::Display for NodeRefLockedMut<'a> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.nr)
}
}
impl<'a> fmt::Debug for NodeRefLockedMut<'a> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("NodeRefLockedMut")
.field("nr", &self.nr)
.finish()
}
}