crash fixes
This commit is contained in:
parent
df931a9329
commit
e91044e33d
@ -12,6 +12,7 @@ crate-type = ["cdylib", "staticlib", "rlib"]
|
||||
[features]
|
||||
default = [ "enable-crypto-vld0" ]
|
||||
crypto-test = [ "enable-crypto-vld0", "enable-crypto-none" ]
|
||||
crypto-test-none = [ "enable-crypto-none" ]
|
||||
enable-crypto-vld0 = []
|
||||
enable-crypto-none = []
|
||||
rt-async-std = ["async-std", "async-std-resolver", "async_executors/async_std", "rtnetlink?/smol_socket", "veilid-tools/rt-async-std"]
|
||||
|
@ -126,19 +126,27 @@ impl BucketEntryInner {
|
||||
pub fn node_ids(&self) -> TypedKeySet {
|
||||
self.node_ids.clone()
|
||||
}
|
||||
|
||||
/// Add a node id for a particular crypto kind.
|
||||
/// Returns any previous existing node id associated with that crypto kind
|
||||
pub fn add_node_id(&mut self, node_id: TypedKey) -> Option<TypedKey> {
|
||||
/// Returns Ok(Some(node)) any previous existing node id associated with that crypto kind
|
||||
/// Returns Ok(None) if no previous existing node id was associated with that crypto kind
|
||||
/// Results Err() if this operation would add more crypto kinds than we support
|
||||
pub fn add_node_id(&mut self, node_id: TypedKey) -> EyreResult<Option<TypedKey>> {
|
||||
if let Some(old_node_id) = self.node_ids.get(node_id.kind) {
|
||||
// If this was already there we do nothing
|
||||
if old_node_id == node_id {
|
||||
return None;
|
||||
return Ok(None);
|
||||
}
|
||||
// Won't change number of crypto kinds
|
||||
self.node_ids.add(node_id);
|
||||
return Ok(Some(old_node_id));
|
||||
}
|
||||
// Check to ensure we aren't adding more crypto kinds than we support
|
||||
if self.node_ids.len() == MAX_CRYPTO_KINDS {
|
||||
bail!("too many crypto kinds for this node");
|
||||
}
|
||||
self.node_ids.add(node_id);
|
||||
return Some(old_node_id);
|
||||
}
|
||||
self.node_ids.add(node_id);
|
||||
None
|
||||
Ok(None)
|
||||
}
|
||||
pub fn best_node_id(&self) -> TypedKey {
|
||||
self.node_ids.best().unwrap()
|
||||
|
@ -590,6 +590,12 @@ impl RoutingTable {
|
||||
|
||||
fn queue_bucket_kicks(&self, node_ids: TypedKeySet) {
|
||||
for node_id in node_ids.iter() {
|
||||
// Skip node ids we didn't add to buckets
|
||||
if !VALID_CRYPTO_KINDS.contains(&node_id.kind) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Put it in the kick queue
|
||||
let x = self.unlocked_inner.calculate_bucket_index(node_id);
|
||||
self.unlocked_inner.kick_queue.lock().insert(x);
|
||||
}
|
||||
|
@ -604,21 +604,33 @@ impl RoutingTableInner {
|
||||
}
|
||||
|
||||
// Update buckets with new node ids we may have learned belong to this entry
|
||||
fn update_bucket_entries(&mut self, entry: Arc<BucketEntry>, node_ids: &[TypedKey]) {
|
||||
fn update_bucket_entries(
|
||||
&mut self,
|
||||
entry: Arc<BucketEntry>,
|
||||
node_ids: &[TypedKey],
|
||||
) -> EyreResult<()> {
|
||||
entry.with_mut_inner(|e| {
|
||||
let existing_node_ids = e.node_ids();
|
||||
for node_id in node_ids {
|
||||
if !existing_node_ids.contains(node_id) {
|
||||
// Skip node ids that exist already
|
||||
if existing_node_ids.contains(node_id) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Add new node id to entry
|
||||
if let Some(old_node_id) = e.add_node_id(*node_id) {
|
||||
let ck = node_id.kind;
|
||||
if let Some(old_node_id) = e.add_node_id(*node_id)? {
|
||||
// Remove any old node id for this crypto kind
|
||||
if VALID_CRYPTO_KINDS.contains(&ck) {
|
||||
let bucket_index = self.unlocked_inner.calculate_bucket_index(&old_node_id);
|
||||
let bucket = self.get_bucket_mut(bucket_index);
|
||||
bucket.remove_entry(&old_node_id.value);
|
||||
self.unlocked_inner.kick_queue.lock().insert(bucket_index);
|
||||
}
|
||||
}
|
||||
|
||||
// Bucket the entry appropriately
|
||||
if VALID_CRYPTO_KINDS.contains(&ck) {
|
||||
let bucket_index = self.unlocked_inner.calculate_bucket_index(node_id);
|
||||
let bucket = self.get_bucket_mut(bucket_index);
|
||||
bucket.add_existing_entry(node_id.value, entry.clone());
|
||||
@ -627,6 +639,7 @@ impl RoutingTableInner {
|
||||
self.unlocked_inner.kick_queue.lock().insert(bucket_index);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
@ -636,7 +649,7 @@ impl RoutingTableInner {
|
||||
fn create_node_ref<F>(
|
||||
&mut self,
|
||||
outer_self: RoutingTable,
|
||||
node_ids: &[TypedKey],
|
||||
node_ids: &TypedKeySet,
|
||||
update_func: F,
|
||||
) -> Option<NodeRef>
|
||||
where
|
||||
@ -651,11 +664,12 @@ impl RoutingTableInner {
|
||||
// Look up all bucket entries and make sure we only have zero or one
|
||||
// If we have more than one, pick the one with the best cryptokind to add node ids to
|
||||
let mut best_entry: Option<Arc<BucketEntry>> = None;
|
||||
for node_id in node_ids {
|
||||
for node_id in node_ids.iter() {
|
||||
// Ignore node ids we don't support
|
||||
if !VALID_CRYPTO_KINDS.contains(&node_id.kind) {
|
||||
log_rtab!(error "can't look up node id with invalid crypto kind");
|
||||
return None;
|
||||
continue;
|
||||
}
|
||||
// Find the first in crypto sort order
|
||||
let bucket_index = self.unlocked_inner.calculate_bucket_index(node_id);
|
||||
let bucket = self.get_bucket(bucket_index);
|
||||
if let Some(entry) = bucket.entry(&node_id.value) {
|
||||
@ -673,7 +687,10 @@ impl RoutingTableInner {
|
||||
// If the entry does exist already, update it
|
||||
if let Some(best_entry) = best_entry {
|
||||
// Update the entry with all of the node ids
|
||||
self.update_bucket_entries(best_entry.clone(), node_ids);
|
||||
if let Err(e) = self.update_bucket_entries(best_entry.clone(), node_ids) {
|
||||
log_rtab!(debug "Not registering new ids for existing node: {}", e);
|
||||
return None;
|
||||
}
|
||||
|
||||
// Make a noderef to return
|
||||
let nr = NodeRef::new(outer_self.clone(), best_entry.clone(), None);
|
||||
@ -694,7 +711,10 @@ impl RoutingTableInner {
|
||||
self.unlocked_inner.kick_queue.lock().insert(bucket_entry);
|
||||
|
||||
// Update the other bucket entries with the remaining node ids
|
||||
self.update_bucket_entries(new_entry.clone(), node_ids);
|
||||
if let Err(e) = self.update_bucket_entries(new_entry.clone(), node_ids) {
|
||||
log_rtab!(debug "Not registering new node: {}", e);
|
||||
return None;
|
||||
}
|
||||
|
||||
// Make node ref to return
|
||||
let nr = NodeRef::new(outer_self.clone(), new_entry.clone(), None);
|
||||
@ -832,7 +852,7 @@ impl RoutingTableInner {
|
||||
descriptor: ConnectionDescriptor,
|
||||
timestamp: Timestamp,
|
||||
) -> Option<NodeRef> {
|
||||
let out = self.create_node_ref(outer_self, &[node_id], |_rti, e| {
|
||||
let out = self.create_node_ref(outer_self, &TypedKeySet::from(node_id), |_rti, e| {
|
||||
// this node is live because it literally just connected to us
|
||||
e.touch_last_seen(timestamp);
|
||||
});
|
||||
|
@ -13,6 +13,7 @@ path = "src/main.rs"
|
||||
[features]
|
||||
default = [ "rt-tokio", "veilid-core/default" ]
|
||||
crypto-test = [ "rt-tokio", "veilid-core/crypto-test"]
|
||||
crypto-test-none = [ "rt-tokio", "veilid-core/crypto-test-none"]
|
||||
|
||||
rt-async-std = [ "veilid-core/rt-async-std", "async-std", "opentelemetry/rt-async-std", "opentelemetry-otlp/grpc-sys" ]
|
||||
rt-tokio = [ "veilid-core/rt-tokio", "tokio", "tokio-stream", "tokio-util", "opentelemetry/rt-tokio", "console-subscriber" ]
|
||||
|
Loading…
Reference in New Issue
Block a user