diff --git a/scripts/run_3.sh b/scripts/run_3.sh new file mode 100755 index 00000000..04d1a01e --- /dev/null +++ b/scripts/run_3.sh @@ -0,0 +1,4 @@ +#!/bin/bash +exec ./run_local_test.py 3 --config-file ./local-test.yml $@ + + diff --git a/veilid-core/proto/veilid.capnp b/veilid-core/proto/veilid.capnp index 11d5f2e6..00916440 100644 --- a/veilid-core/proto/veilid.capnp +++ b/veilid-core/proto/veilid.capnp @@ -404,42 +404,43 @@ struct Operation { respondTo :union { none @1 :Void; # no response is desired - sender @2 :NodeInfo; # (Optional) some envelope-sender node info to be used for reply (others may exist via findNodeQ) - privateRoute @3 :PrivateRoute; # embedded private route to be used for reply + sender @2 :Void; # sender without node info + senderWithInfo @3 :NodeInfo; # some envelope-sender node info to be used for reply + privateRoute @4 :PrivateRoute; # embedded private route to be used for reply } detail :union { # Direct operations - infoQ @4 :OperationInfoQ; - infoA @5 :OperationInfoA; - validateDialInfo @6 :OperationValidateDialInfo; - findNodeQ @7 :OperationFindNodeQ; - findNodeA @8 :OperationFindNodeA; - route @9 :OperationRoute; + infoQ @5 :OperationInfoQ; + infoA @6 :OperationInfoA; + validateDialInfo @7 :OperationValidateDialInfo; + findNodeQ @8 :OperationFindNodeQ; + findNodeA @9 :OperationFindNodeA; + route @10 :OperationRoute; # Routable operations - getValueQ @10 :OperationGetValueQ; - getValueA @11 :OperationGetValueA; - setValueQ @12 :OperationSetValueQ; - setValueA @13 :OperationSetValueA; - watchValueQ @14 :OperationWatchValueQ; - watchValueA @15 :OperationWatchValueA; - valueChanged @16 :OperationValueChanged; + getValueQ @11 :OperationGetValueQ; + getValueA @12 :OperationGetValueA; + setValueQ @13 :OperationSetValueQ; + setValueA @14 :OperationSetValueA; + watchValueQ @15 :OperationWatchValueQ; + watchValueA @16 :OperationWatchValueA; + valueChanged @17 :OperationValueChanged; - supplyBlockQ @17 :OperationSupplyBlockQ; - supplyBlockA @18 :OperationSupplyBlockA; - findBlockQ @19 :OperationFindBlockQ; - findBlockA @20 :OperationFindBlockA; + supplyBlockQ @18 :OperationSupplyBlockQ; + supplyBlockA @19 :OperationSupplyBlockA; + findBlockQ @20 :OperationFindBlockQ; + findBlockA @21 :OperationFindBlockA; - signal @21 :OperationSignal; - returnReceipt @22 :OperationReturnReceipt; + signal @22 :OperationSignal; + returnReceipt @23 :OperationReturnReceipt; # Tunnel operations - startTunnelQ @23 :OperationStartTunnelQ; - startTunnelA @24 :OperationStartTunnelA; - completeTunnelQ @25 :OperationCompleteTunnelQ; - completeTunnelA @26 :OperationCompleteTunnelA; - cancelTunnelQ @27 :OperationCancelTunnelQ; - cancelTunnelA @28 :OperationCancelTunnelA; + startTunnelQ @24 :OperationStartTunnelQ; + startTunnelA @25 :OperationStartTunnelA; + completeTunnelQ @26 :OperationCompleteTunnelQ; + completeTunnelA @27 :OperationCompleteTunnelA; + cancelTunnelQ @28 :OperationCancelTunnelQ; + cancelTunnelA @29 :OperationCancelTunnelA; } } diff --git a/veilid-core/src/network_manager.rs b/veilid-core/src/network_manager.rs index 5bbe2e1f..4c7b9e16 100644 --- a/veilid-core/src/network_manager.rs +++ b/veilid-core/src/network_manager.rs @@ -732,6 +732,11 @@ impl NetworkManager { return Ok(ContactMethod::OutboundRelay(relay_node)); } // Otherwise, we can't reach this node + debug!( + "unable to reach node {:?}: {}", + target_node_ref, + target_node_ref.operate(|e| format!("{:#?}", e)) + ); Ok(ContactMethod::Unreachable) } diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index 7219d93e..b6d00387 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -416,29 +416,41 @@ impl RoutingTable { } } - pub fn create_node_ref(&self, node_id: DHTKey) -> Result { + // Create a node reference, possibly creating a bucket entry + // the 'update_func' closure is called on the node, and, if created, + // in a locked fashion as to ensure the bucket entry state is always valid + pub fn create_node_ref(&self, node_id: DHTKey, update_func: F) -> Result + where + F: FnOnce(&mut BucketEntry), + { // Ensure someone isn't trying register this node itself if node_id == self.node_id() { return Err("can't register own node".to_owned()).map_err(logthru_rtab!(error)); } - // Insert into bucket, possibly evicting the newest bucket member - let noderef = match self.lookup_node_ref(node_id) { + // Lock this entire operation + let mut inner = self.inner.lock(); + + // Look up existing entry + let idx = Self::find_bucket_index(&*inner, node_id); + let noderef = { + let bucket = &mut inner.buckets[idx]; + let entry = bucket.entry_mut(&node_id); + entry.map(|e| NodeRef::new(self.clone(), node_id, e, None)) + }; + + // If one doesn't exist, insert into bucket, possibly evicting a bucket member + let noderef = match noderef { None => { // Make new entry - let mut inner = self.inner.lock(); - let idx = Self::find_bucket_index(&*inner, node_id); - let nr = { - // Get the bucket for the entry - let bucket = &mut inner.buckets[idx]; - // Add new entry - let nr = bucket.add_entry(node_id); + inner.bucket_entry_count += 1; + log_rtab!("Routing table now has {} nodes", inner.bucket_entry_count); + let bucket = &mut inner.buckets[idx]; + let nr = bucket.add_entry(node_id); - // Update count - inner.bucket_entry_count += 1; - log_rtab!("Routing table now has {} nodes", inner.bucket_entry_count); - nr - }; + // Update the entry + let entry = bucket.entry_mut(&node_id); + update_func(entry.unwrap()); // Kick the bucket // It is important to do this in the same inner lock as the add_entry @@ -446,7 +458,14 @@ impl RoutingTable { nr } - Some(nr) => nr, + Some(nr) => { + // Update the entry + let bucket = &mut inner.buckets[idx]; + let entry = bucket.entry_mut(&node_id); + update_func(entry.unwrap()); + + nr + } }; Ok(noderef) @@ -468,10 +487,8 @@ impl RoutingTable { node_id: DHTKey, node_info: NodeInfo, ) -> Result { - let nr = self.create_node_ref(node_id)?; - nr.operate(move |e| -> Result<(), String> { + let nr = self.create_node_ref(node_id, |e| { e.update_node_info(node_info); - Ok(()) })?; Ok(nr) @@ -485,24 +502,34 @@ impl RoutingTable { descriptor: ConnectionDescriptor, timestamp: u64, ) -> Result { - let nr = self.create_node_ref(node_id)?; - nr.operate(move |e| { + let nr = self.create_node_ref(node_id, |e| { // set the most recent node address for connection finding and udp replies e.set_last_connection(descriptor, timestamp); - }); + })?; Ok(nr) } + fn operate_on_bucket_entry_locked( + inner: &mut RoutingTableInner, + node_id: DHTKey, + f: F, + ) -> T + where + F: FnOnce(&mut BucketEntry) -> T, + { + let idx = Self::find_bucket_index(&*inner, node_id); + let bucket = &mut inner.buckets[idx]; + let entry = bucket.entry_mut(&node_id).unwrap(); + f(entry) + } + fn operate_on_bucket_entry(&self, node_id: DHTKey, f: F) -> T where F: FnOnce(&mut BucketEntry) -> T, { let mut inner = self.inner.lock(); - let idx = Self::find_bucket_index(&*inner, node_id); - let bucket = &mut inner.buckets[idx]; - let entry = bucket.entry_mut(&node_id).unwrap(); - f(entry) + Self::operate_on_bucket_entry_locked(&mut *inner, node_id, f) } pub fn find_inbound_relay(&self, cur_ts: u64) -> Option { diff --git a/veilid-core/src/rpc_processor/debug.rs b/veilid-core/src/rpc_processor/debug.rs index c9470c4f..82b4a163 100644 --- a/veilid-core/src/rpc_processor/debug.rs +++ b/veilid-core/src/rpc_processor/debug.rs @@ -125,6 +125,21 @@ impl RPCProcessor { let respond_to_str = match respond_to { veilid_capnp::operation::respond_to::None(_) => "(None)".to_owned(), veilid_capnp::operation::respond_to::Sender(_) => "Sender".to_owned(), + veilid_capnp::operation::respond_to::SenderWithInfo(ni) => { + let ni_reader = match ni { + Ok(nir) => nir, + Err(e) => { + return e.to_string(); + } + }; + let node_info = match decode_node_info(&ni_reader, true) { + Ok(ni) => ni, + Err(e) => { + return e.to_string(); + } + }; + format!("Sender({:?})", node_info) + } veilid_capnp::operation::respond_to::PrivateRoute(pr) => { let pr_reader = match pr { Ok(prr) => prr, diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index 935e2e23..6a6da9df 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -45,11 +45,11 @@ impl RespondTo { builder.set_none(()); } Self::Sender(Some(ni)) => { - let mut ni_builder = builder.reborrow().init_sender(); + let mut ni_builder = builder.reborrow().init_sender_with_info(); encode_node_info(ni, &mut ni_builder)?; } Self::Sender(None) => { - builder.reborrow().init_sender(); + builder.reborrow().set_sender(()); } Self::PrivateRoute(pr) => { let mut pr_builder = builder.reborrow().init_private_route(); @@ -616,7 +616,8 @@ impl RPCProcessor { return Err(rpc_error_internal("no response requested")) .map_err(logthru_rpc!()); } - veilid_capnp::operation::respond_to::Sender(_) => { + veilid_capnp::operation::respond_to::SenderWithInfo(_) + | veilid_capnp::operation::respond_to::Sender(_) => { // Respond to envelope source node, possibly through a relay if the request arrived that way // ------------------------------- match safety_route_spec { @@ -735,11 +736,15 @@ impl RPCProcessor { } fn wants_answer(&self, operation: &veilid_capnp::operation::Reader) -> Result { - match operation.get_respond_to().which() { - Ok(veilid_capnp::operation::respond_to::None(_)) => Ok(false), - Ok(veilid_capnp::operation::respond_to::Sender(_)) => Ok(true), - Ok(veilid_capnp::operation::respond_to::PrivateRoute(_)) => Ok(true), - _ => Err(rpc_error_internal("Unknown respond_to")), + match operation + .get_respond_to() + .which() + .map_err(map_error_capnp_notinschema!())? + { + veilid_capnp::operation::respond_to::None(_) => Ok(false), + veilid_capnp::operation::respond_to::Sender(_) + | veilid_capnp::operation::respond_to::SenderWithInfo(_) + | veilid_capnp::operation::respond_to::PrivateRoute(_) => Ok(true), } } @@ -747,15 +752,20 @@ impl RPCProcessor { &self, operation: &veilid_capnp::operation::Reader, ) -> Result, RPCError> { - if let veilid_capnp::operation::respond_to::Sender(Ok(sender_ni_reader)) = operation + match operation .get_respond_to() .which() .map_err(map_error_capnp_notinschema!())? { - // Sender DialInfo was specified, update our routing table with it - Ok(Some(decode_node_info(&sender_ni_reader, true)?)) - } else { - Ok(None) + veilid_capnp::operation::respond_to::SenderWithInfo(Ok(sender_ni_reader)) => { + Ok(Some(decode_node_info(&sender_ni_reader, true)?)) + } + veilid_capnp::operation::respond_to::SenderWithInfo(Err(e)) => Err(rpc_error_protocol( + format!("invalid sender_with_info node info: {}", e), + )), + veilid_capnp::operation::respond_to::None(_) + | veilid_capnp::operation::respond_to::Sender(_) + | veilid_capnp::operation::respond_to::PrivateRoute(_) => Ok(None), } }