From e2c5691d7e48affe9a1f5d12b87b2605b8b061b3 Mon Sep 17 00:00:00 2001 From: John Smith Date: Fri, 5 May 2023 21:23:17 -0400 Subject: [PATCH] checkpoint --- veilid-core/src/crypto/types/crypto_typed.rs | 15 +- veilid-core/src/network_manager/mod.rs | 2 +- veilid-core/src/routing_table/mod.rs | 10 + .../src/routing_table/routing_table_inner.rs | 68 +++++- veilid-core/src/rpc_processor/fanout_call.rs | 222 ++++++++++++++++++ veilid-core/src/rpc_processor/mod.rs | 132 +++++------ .../src/storage_manager/do_get_value.rs | 107 +++++++++ veilid-core/src/storage_manager/mod.rs | 15 +- veilid-core/src/veilid_api/routing_context.rs | 7 +- veilid-flutter/rust/src/dart_ffi.rs | 2 +- veilid-wasm/src/lib.rs | 2 +- 11 files changed, 484 insertions(+), 98 deletions(-) create mode 100644 veilid-core/src/rpc_processor/fanout_call.rs create mode 100644 veilid-core/src/storage_manager/do_get_value.rs diff --git a/veilid-core/src/crypto/types/crypto_typed.rs b/veilid-core/src/crypto/types/crypto_typed.rs index b6575769..f37ac7ce 100644 --- a/veilid-core/src/crypto/types/crypto_typed.rs +++ b/veilid-core/src/crypto/types/crypto_typed.rs @@ -127,12 +127,17 @@ where type Err = VeilidAPIError; fn from_str(s: &str) -> Result { let b = s.as_bytes(); - if b.len() != (5 + K::encoded_len()) || b[4..5] != b":"[..] { - apibail_parse_error!("invalid typed key", s); + if b.len() == (5 + K::encoded_len()) && b[4..5] != b":"[..] { + let kind: CryptoKind = b[0..4].try_into().expect("should not fail to convert"); + let value = K::try_decode_bytes(&b[5..])?; + Ok(Self { kind, value }) + } else if b.len() == K::encoded_len() { + let kind = best_crypto_kind(); + let value = K::try_decode_bytes(b)?; + Ok(Self { kind, value }) + } else { + apibail_generic!("invalid cryptotyped format"); } - let kind: CryptoKind = b[0..4].try_into().expect("should not fail to convert"); - let value = K::try_decode_bytes(&b[5..])?; - Ok(Self { kind, value }) } } impl<'de, K> Deserialize<'de> for CryptoTyped diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index 609fe0ee..6b71130d 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -1400,7 +1400,7 @@ impl NetworkManager { let some_relay_nr = if self.check_client_whitelist(sender_id) { // Full relay allowed, do a full resolve_node - match rpc.resolve_node(recipient_id.value).await { + match rpc.resolve_node(recipient_id, SafetySelection::Unsafe(Sequencing::default())).await { Ok(v) => v, Err(e) => { log_net!(debug "failed to resolve recipient node for relay, dropping outbound relayed packet: {}" ,e); diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index 50c4db74..8cdf65f7 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -964,6 +964,16 @@ impl RoutingTable { .find_closest_nodes(node_count, node_id, filters, transform) } + pub fn sort_and_clean_closest_noderefs( + &self, + node_id: TypedKey, + closest_nodes: &mut Vec, + ) { + self.inner + .read() + .sort_and_clean_closest_noderefs(node_id, closest_nodes) + } + #[instrument(level = "trace", skip(self), ret)] pub fn register_find_node_answer( &self, diff --git a/veilid-core/src/routing_table/routing_table_inner.rs b/veilid-core/src/routing_table/routing_table_inner.rs index c2c92708..e70e705d 100644 --- a/veilid-core/src/routing_table/routing_table_inner.rs +++ b/veilid-core/src/routing_table/routing_table_inner.rs @@ -1153,7 +1153,6 @@ impl RoutingTableInner { let vcrypto = self.unlocked_inner.crypto().get(crypto_kind).unwrap(); // Filter to ensure entries support the crypto kind in use - let filter = Box::new( move |_rti: &RoutingTableInner, opt_entry: Option>| { if let Some(entry) = opt_entry { @@ -1219,4 +1218,71 @@ impl RoutingTableInner { log_rtab!(">> find_closest_nodes: node count = {}", out.len()); out } + + pub fn sort_and_clean_closest_noderefs( + &self, + node_id: TypedKey, + closest_nodes: &mut Vec, + ) { + // Lock all noderefs + let kind = node_id.kind; + let mut closest_nodes_locked: Vec = closest_nodes + .iter() + .filter_map(|x| { + if x.node_ids().kinds().contains(&kind) { + Some(x.locked(self)) + } else { + None + } + }) + .collect(); + + // Sort closest + let sort = make_closest_noderef_sort(self.unlocked_inner.crypto(), node_id); + closest_nodes_locked.sort_by(sort); + + // Unlock noderefs + *closest_nodes = closest_nodes_locked.iter().map(|x| x.unlocked()).collect(); + } +} + +fn make_closest_noderef_sort( + crypto: Crypto, + node_id: TypedKey, +) -> impl Fn(&NodeRefLocked, &NodeRefLocked) -> core::cmp::Ordering { + let cur_ts = get_aligned_timestamp(); + let kind = node_id.kind; + // Get cryptoversion to check distance with + let vcrypto = crypto.get(node_id.kind).unwrap(); + + move |a: &NodeRefLocked, b: &NodeRefLocked| -> core::cmp::Ordering { + // same nodes are always the same + if a.same_entry(b) { + return core::cmp::Ordering::Equal; + } + + // reliable nodes come first, pessimistically treating our own node as unreliable + a.operate(|_rti, a_entry| { + b.operate(|_rti, b_entry| { + let ra = a_entry.check_reliable(cur_ts); + let rb = b_entry.check_reliable(cur_ts); + if ra != rb { + if ra { + return core::cmp::Ordering::Less; + } else { + return core::cmp::Ordering::Greater; + } + } + + // get keys + let a_key = a_entry.node_ids().get(node_id.kind).unwrap(); + let b_key = b_entry.node_ids().get(node_id.kind).unwrap(); + + // distance is the next metric, closer nodes first + let da = vcrypto.distance(&a_key.value, &node_id.value); + let db = vcrypto.distance(&b_key.value, &node_id.value); + da.cmp(&db) + }) + }) + } } diff --git a/veilid-core/src/rpc_processor/fanout_call.rs b/veilid-core/src/rpc_processor/fanout_call.rs new file mode 100644 index 00000000..ff0617e1 --- /dev/null +++ b/veilid-core/src/rpc_processor/fanout_call.rs @@ -0,0 +1,222 @@ +use super::*; + +struct FanoutContext +where + R: Unpin, +{ + closest_nodes: Vec, + called_nodes: TypedKeySet, + result: Option>, +} + +pub type FanoutCallReturnType = Result>, RPCError>; + +pub struct FanoutCall +where + R: Unpin, + F: Future, + C: Fn(NodeRef) -> F, + D: Fn(&[NodeRef]) -> Option, +{ + routing_table: RoutingTable, + crypto_kind: CryptoKind, + node_id: TypedKey, + context: Mutex>, + count: usize, + fanout: usize, + timeout_us: TimestampDuration, + call_routine: C, + check_done: D, +} + +impl FanoutCall +where + R: Unpin, + F: Future, + C: Fn(NodeRef) -> F, + D: Fn(&[NodeRef]) -> Option, +{ + pub fn new( + routing_table: RoutingTable, + node_id: TypedKey, + count: usize, + fanout: usize, + timeout_us: TimestampDuration, + call_routine: C, + check_done: D, + ) -> Arc { + let context = Mutex::new(FanoutContext { + closest_nodes: Vec::with_capacity(count), + called_nodes: TypedKeySet::new(), + result: None, + }); + + Arc::new(Self { + routing_table, + node_id, + crypto_kind: node_id.kind, + context, + count, + fanout, + timeout_us, + call_routine, + check_done, + }) + } + + fn add_new_nodes(self: Arc, new_nodes: Vec) { + let mut ctx = self.context.lock(); + + for nn in new_nodes { + let mut dup = false; + for cn in &ctx.closest_nodes { + if cn.same_entry(&nn) { + dup = true; + } + } + if !dup { + ctx.closest_nodes.push(nn.clone()); + } + } + + self.routing_table + .sort_and_clean_closest_noderefs(self.node_id, &mut ctx.closest_nodes); + ctx.closest_nodes.truncate(self.count); + } + + fn remove_node(self: Arc, dead_node: NodeRef) { + let mut ctx = self.context.lock(); + for n in 0..ctx.closest_nodes.len() { + let cn = &ctx.closest_nodes[n]; + if cn.same_entry(&dead_node) { + ctx.closest_nodes.remove(n); + break; + } + } + } + + fn get_next_node(self: Arc) -> Option { + let mut next_node = None; + let mut ctx = self.context.lock(); + for cn in &ctx.closest_nodes { + if let Some(key) = cn.node_ids().get(self.crypto_kind) { + if !ctx.called_nodes.contains(&key) { + // New fanout call candidate found + next_node = Some(cn.clone()); + ctx.called_nodes.add(key); + } + } + } + next_node + } + + fn evaluate_done(self: Arc) -> bool { + let mut ctx = self.context.lock(); + + // If we have a result, then we're done + if ctx.result.is_some() { + return true; + } + + // Check for a new done result + ctx.result = (self.check_done)(&ctx.closest_nodes).map(|o| Ok(o)); + ctx.result.is_some() + } + + async fn fanout_processor(self: Arc) { + // Check to see if we have a result or are done + while !self.clone().evaluate_done() { + // Get the closest node we haven't processed yet + let next_node = self.clone().get_next_node(); + + // If we don't have a node to process, stop fanning out + let Some(next_node) = next_node else { + return; + }; + + // Do the call for this node + match (self.call_routine)(next_node.clone()).await { + Ok(Some(v)) => { + // Call succeeded + // Register the returned nodes and add them to the closest nodes list in sorted order + let new_nodes = self + .routing_table + .register_find_node_answer(self.crypto_kind, v); + self.clone().add_new_nodes(new_nodes); + } + Ok(None) => { + // Call failed, remove the node so it isn't included in the output + self.clone().remove_node(next_node); + } + Err(e) => { + // Error happened, abort everything and return the error + } + }; + } + } + + fn init_closest_nodes(self: Arc) { + // Get the 'count' closest nodes to the key out of our routing table + let closest_nodes = { + let routing_table = self.routing_table.clone(); + + let filter = Box::new( + move |rti: &RoutingTableInner, opt_entry: Option>| { + // Exclude our own node + if opt_entry.is_none() { + return false; + } + + // Ensure only things that are valid/signed in the PublicInternet domain are returned + rti.filter_has_valid_signed_node_info( + RoutingDomain::PublicInternet, + true, + opt_entry, + ) + }, + ) as RoutingTableEntryFilter; + let filters = VecDeque::from([filter]); + + let transform = |_rti: &RoutingTableInner, v: Option>| { + NodeRef::new(routing_table.clone(), v.unwrap().clone(), None) + }; + + routing_table.find_closest_nodes(self.count, self.node_id, filters, transform) + }; + + let mut ctx = self.context.lock(); + ctx.closest_nodes = closest_nodes; + } + + pub async fn run(self: Arc) -> TimeoutOr, RPCError>> { + // Initialize closest nodes list + self.clone().init_closest_nodes(); + + // Do a quick check to see if we're already done + if self.clone().evaluate_done() { + let mut ctx = self.context.lock(); + return TimeoutOr::value(ctx.result.take().transpose()); + } + + // If not, do the fanout + let mut unord = FuturesUnordered::new(); + { + // Spin up 'fanout' tasks to process the fanout + for _ in 0..self.fanout { + let h = self.clone().fanout_processor(); + unord.push(h); + } + } + // Wait for them to complete + timeout((self.timeout_us.as_u64() / 1000u64) as u32, async { + while let Some(_) = unord.next().await {} + }) + .await + .into_timeout_or() + .map(|_| { + // Finished, return whatever value we came up with + let mut ctx = self.context.lock(); + ctx.result.take().transpose() + }) + } +} diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index 87286060..702fd73c 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -1,5 +1,6 @@ mod coders; mod destination; +mod fanout_call; mod operation_waiter; mod rpc_app_call; mod rpc_app_message; @@ -22,6 +23,7 @@ mod rpc_watch_value; pub use coders::*; pub use destination::*; +pub use fanout_call::*; pub use operation_waiter::*; pub use rpc_error::*; pub use rpc_status::*; @@ -399,90 +401,64 @@ impl RPCProcessor { /// Search the DHT for a single node closest to a key and add it to the routing table and return the node reference /// If no node was found in the timeout, this returns None - pub async fn search_dht_single_key( + async fn search_dht_single_key( &self, node_id: TypedKey, count: usize, fanout: usize, timeout_us: TimestampDuration, - ) -> Result, RPCError> { + safety_selection: SafetySelection, + ) -> TimeoutOr, RPCError>> { let routing_table = self.routing_table(); - let filter = Box::new( - move |rti: &RoutingTableInner, opt_entry: Option>| { - // Exclude our own node - if opt_entry.is_none() { - return false; + // Routine to call to generate fanout + let call_routine = |next_node: NodeRef| { + let this = self.clone(); + async move { + match this + .clone() + .rpc_call_find_node( + Destination::direct(next_node).with_safety(safety_selection), + node_id, + ) + .await + { + Ok(v) => { + let v = network_result_value_or_log!(v => { + // Any other failures, just try the next node + return Ok(None); + }); + Ok(Some(v.answer)) + } + Err(e) => Err(e), } - - // Ensure only things that are valid/signed in the PublicInternet domain are returned - rti.filter_has_valid_signed_node_info( - RoutingDomain::PublicInternet, - true, - opt_entry, - ) - }, - ) as RoutingTableEntryFilter; - let filters = VecDeque::from([filter]); - - let transform = |_rti: &RoutingTableInner, v: Option>| { - NodeRef::new(routing_table.clone(), v.unwrap().clone(), None) + } }; - // Get the 'count' closest nodes to the key out of our routing table - let closest_nodes = routing_table.find_closest_nodes(count, node_id, filters, transform); - - // If the node we want to locate is one of the closest nodes, return it immediately - if let Some(out) = closest_nodes - .iter() - .find(|x| x.node_ids().contains(&node_id)) - { - return Ok(Some(out.clone())); - } - - // Make accessible to fanout tasks - struct FanoutContext { - closest_nodes: Vec, - called_nodes: TypedKeySet, - } - let closest_nodes = Arc::new(Mutex::new(closest_nodes)); - - // Otherwise contact the 'fanout' closest nodes to see if there's closer nodes - let mut unord = FuturesUnordered::new(); - { - // Spin up 'fanout' tasks to process the fanout - for n in 0..4 { - // Fanout processor - let closest_nodes = closest_nodes.clone(); - let h = async move { - // Find the nth node to iterate on - let cn = closest_nodes.lock(); - let n = n.clamp(0, cn.len()); xxx dont do this, use called nodes set, shouldnt need stop token canceller, but maybe at the top level? nothing is spawning. so maybe not. - let mut node = - - }; - unord.push(h); + // Routine to call to check if we're done at each step + let check_done = |closest_nodes: &[NodeRef]| { + // If the node we want to locate is one of the closest nodes, return it immediately + if let Some(out) = closest_nodes + .iter() + .find(|x| x.node_ids().contains(&node_id)) + { + return Some(out.clone()); } - } - // Wait for them to complete - timeout((timeout_us.as_u64() / 1000u64) as u32, async { - while let Some(_) = unord.next().await {} - }) - .await; + None + }; - Ok(None) - } + // Call the fanout + let fanout_call = FanoutCall::new( + routing_table.clone(), + node_id, + count, + fanout, + timeout_us, + call_routine, + check_done, + ); - /// Search the DHT for the 'count' closest nodes to a key, adding them all to the routing table if they are not there and returning their node references - pub async fn search_dht_multi_key( - &self, - _node_id: TypedKey, - _count: usize, - _fanout: usize, - _timeout: TimestampDuration, - ) -> Result, RPCError> { - // xxx return closest nodes after the timeout - Err(RPCError::unimplemented("search_dht_multi_key")).map_err(logthru_rpc!(error)) + fanout_call.run().await } /// Search the DHT for a specific node corresponding to a key unless we have that node in our routing table already, and return the node reference @@ -490,6 +466,7 @@ impl RPCProcessor { pub fn resolve_node( &self, node_id: TypedKey, + safety_selection: SafetySelection, ) -> SendPinBoxFuture, RPCError>> { let this = self.clone(); Box::pin(async move { @@ -515,9 +492,16 @@ impl RPCProcessor { }; // Search in preferred cryptosystem order - let nr = this - .search_dht_single_key(node_id, count, fanout, timeout) - .await?; + let nr = match this + .search_dht_single_key(node_id, count, fanout, timeout, safety_selection) + .await + { + TimeoutOr::Timeout => None, + TimeoutOr::Value(Ok(v)) => v, + TimeoutOr::Value(Err(e)) => { + return Err(e); + } + }; if let Some(nr) = &nr { if nr.node_ids().contains(&node_id) { diff --git a/veilid-core/src/storage_manager/do_get_value.rs b/veilid-core/src/storage_manager/do_get_value.rs new file mode 100644 index 00000000..5d9fd846 --- /dev/null +++ b/veilid-core/src/storage_manager/do_get_value.rs @@ -0,0 +1,107 @@ +use super::*; + +pub struct DoGetValueResult { + pub value: Option, + pub descriptor: Option, +} + +impl StorageManager { + + pub async fn do_get_value( + &self, + mut inner: AsyncMutexGuardArc, + key: TypedKey, + subkey: ValueSubkey, + min_seq: ValueSeqNum, + last_descriptor: Option, + safety_selection: SafetySelection, + ) -> Result, VeilidAPIError> { + let Some(rpc_processor) = inner.rpc_processor.clone() else { + apibail_not_initialized!(); + }; + + let routing_table = rpc_processor.routing_table(); + + // Get the DHT parameters for 'GetValue' + let (count, fanout, timeout) = { + let c = self.unlocked_inner.config.get(); + ( + c.network.dht.get_value_count as usize, + c.network.dht.get_value_fanout as usize, + TimestampDuration::from(ms_to_us(c.network.dht.get_value_timeout_ms)), + ) + }; + + // Routine to call to generate fanout + let call_routine = |next_node: NodeRef| { + let rpc_processor = rpc_processor.clone(); + async move { + match rpc_processor + .clone() + .rpc_call_get_value( + Destination::direct(next_node).with_safety(safety_selection), + key, subkey, last_descriptor + ) + .await + { + Ok(v) => { + let v = network_result_value_or_log!(v => { + // Any other failures, just try the next node + return Ok(None); + }); + + // Keep the value if we got one and it is newer and it passes schema validation + if let Some(value) = v.answer.value { + // See if this is even a candidate + if value.value_data(). xxx apply min_seq and also to OperationGetValueQ + // Validate with scheam + } + + // Return peers if we have some + Ok(Some(v.answer.peers)) + } + Err(e) => Err(e), + } + } + }; + + // Routine to call to check if we're done at each step + let check_done = |closest_nodes: &[NodeRef]| { + // If the node we want to locate is one of the closest nodes, return it immediately + if let Some(out) = closest_nodes + .iter() + .find(|x| x.node_ids().contains(&node_id)) + { + return Some(out.clone()); + } + None + }; + + // Call the fanout + let fanout_call = FanoutCall::new( + routing_table.clone(), + node_id, + count, + fanout, + timeout_us, + call_routine, + check_done, + ); + + fanout_call.run().await + + // Search in preferred cryptosystem order + let nr = this + .search_dht_single_key(node_id, count, fanout, timeout, safety_selection) + .await?; + + if let Some(nr) = &nr { + if nr.node_ids().contains(&node_id) { + // found a close node, but not exact within our configured resolve_node timeout + return Ok(None); + } + } + + Ok(nr) + } +} diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index 4dec1ce2..ef03e04c 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -1,3 +1,4 @@ +mod do_get_value; mod keys; mod record_store; mod record_store_limits; @@ -265,18 +266,6 @@ impl StorageManager { .await } - async fn do_get_value( - &self, - mut inner: AsyncMutexGuardArc, - key: TypedKey, - subkey: ValueSubkey, - ) -> Result, VeilidAPIError> { - let Some(rpc_processor) = inner.rpc_processor.clone() else { - apibail_not_initialized!(); - }; - - // - } async fn open_record_inner( &self, mut inner: AsyncMutexGuardArc, @@ -334,7 +323,7 @@ impl StorageManager { Ok(descriptor) } else { // No record yet, try to get it from the network - self.do_get_value(inner, key, 0).await + self.do_get_value(inner, key, 0, safety_selection).await // Make DHT Record Descriptor to return // let descriptor = DHTRecordDescriptor { diff --git a/veilid-core/src/veilid_api/routing_context.rs b/veilid-core/src/veilid_api/routing_context.rs index ed950234..fd06b286 100644 --- a/veilid-core/src/veilid_api/routing_context.rs +++ b/veilid-core/src/veilid_api/routing_context.rs @@ -4,7 +4,7 @@ use super::*; #[derive(Clone, Debug)] pub enum Target { - NodeId(PublicKey), // Node by any of its public keys + NodeId(TypedKey), // Node by its public key PrivateRoute(RouteId), // Remote private route by its id } @@ -105,7 +105,10 @@ impl RoutingContext { match target { Target::NodeId(node_id) => { // Resolve node - let mut nr = match rpc_processor.resolve_node(node_id).await { + let mut nr = match rpc_processor + .resolve_node(node_id, self.unlocked_inner.safety_selection) + .await + { Ok(Some(nr)) => nr, Ok(None) => apibail_invalid_target!(), Err(e) => return Err(e.into()), diff --git a/veilid-flutter/rust/src/dart_ffi.rs b/veilid-flutter/rust/src/dart_ffi.rs index 4650844d..a3c96ff1 100644 --- a/veilid-flutter/rust/src/dart_ffi.rs +++ b/veilid-flutter/rust/src/dart_ffi.rs @@ -74,7 +74,7 @@ async fn parse_target(s: String) -> APIResult { } // Is this a node id? - if let Ok(nid) = veilid_core::PublicKey::from_str(&s) { + if let Ok(nid) = veilid_core::TypedKey::from_str(&s) { return Ok(veilid_core::Target::NodeId(nid)); } diff --git a/veilid-wasm/src/lib.rs b/veilid-wasm/src/lib.rs index 26da17bf..82295e08 100644 --- a/veilid-wasm/src/lib.rs +++ b/veilid-wasm/src/lib.rs @@ -94,7 +94,7 @@ fn parse_target(s: String) -> APIResult { } // Is this a node id? - if let Ok(nid) = veilid_core::PublicKey::from_str(&s) { + if let Ok(nid) = veilid_core::TypedKey::from_str(&s) { return Ok(veilid_core::Target::NodeId(nid)); }