xfer
This commit is contained in:
		| @@ -29,6 +29,13 @@ pub enum Destination { | ||||
| } | ||||
|  | ||||
| impl Destination { | ||||
|     pub fn target(&self) -> Option<NodeRef> { | ||||
|         match self { | ||||
|             Destination::Direct { target, safety_selection: _ } => Some(target.clone()), | ||||
|             Destination::Relay { relay:_, target, safety_selection: _ } => Some(target.clone()), | ||||
|             Destination::PrivateRoute { private_route:_, safety_selection:_ } => None, | ||||
|         } | ||||
|     } | ||||
|     pub fn direct(target: NodeRef) -> Self { | ||||
|         let sequencing = target.sequencing(); | ||||
|         Self::Direct { | ||||
|   | ||||
| @@ -88,6 +88,7 @@ where | ||||
|             for cn in &ctx.closest_nodes { | ||||
|                 if cn.same_entry(&nn) { | ||||
|                     dup = true; | ||||
|                     break; | ||||
|                 } | ||||
|             } | ||||
|             if !dup { | ||||
| @@ -125,6 +126,7 @@ where | ||||
|                     // New fanout call candidate found | ||||
|                     next_node = Some(cn.clone()); | ||||
|                     ctx.called_nodes.add(key); | ||||
|                     break; | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|   | ||||
| @@ -395,7 +395,7 @@ impl RPCProcessor { | ||||
|     ////////////////////////////////////////////////////////////////////// | ||||
|  | ||||
|     /// Determine if a SignedNodeInfo can be placed into the specified routing domain | ||||
|     fn filter_node_info( | ||||
|     fn verify_node_info( | ||||
|         &self, | ||||
|         routing_domain: RoutingDomain, | ||||
|         signed_node_info: &SignedNodeInfo, | ||||
| @@ -404,6 +404,37 @@ impl RPCProcessor { | ||||
|         routing_table.signed_node_info_is_valid_in_routing_domain(routing_domain, &signed_node_info) | ||||
|     } | ||||
|  | ||||
|     /// Determine if set of peers is closer to key_near than key_far | ||||
|     fn verify_peers_closer( | ||||
|         &self, | ||||
|         vcrypto: CryptoSystemVersion, | ||||
|         key_far: TypedKey, | ||||
|         key_near: TypedKey, | ||||
|         peers: &[PeerInfo], | ||||
|     ) -> Result<bool, RPCError> { | ||||
|         let kind = vcrypto.kind(); | ||||
|  | ||||
|         if key_far.kind != kind || key_near.kind != kind { | ||||
|             return Err(RPCError::internal("keys all need the same cryptosystem")); | ||||
|         } | ||||
|  | ||||
|         let mut closer = true; | ||||
|         for peer in peers { | ||||
|             let Some(key_peer) = peer.node_ids().get(kind) else { | ||||
|                 return Err(RPCError::invalid_format( | ||||
|                     "peers need to have a key with the same cryptosystem", | ||||
|                 )); | ||||
|             }; | ||||
|             let d_near = vcrypto.distance(&key_near.value, &key_peer.value); | ||||
|             let d_far = vcrypto.distance(&key_far.value, &key_peer.value); | ||||
|             if d_far < d_near { | ||||
|                 closer = false; | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         Ok(closer) | ||||
|     } | ||||
|  | ||||
|     ////////////////////////////////////////////////////////////////////// | ||||
|  | ||||
|     /// Search the DHT for a single node closest to a key and add it to the routing table and return the node reference | ||||
| @@ -1348,7 +1379,7 @@ impl RPCProcessor { | ||||
|                     // Ensure the sender peer info is for the actual sender specified in the envelope | ||||
|  | ||||
|                     // Sender PeerInfo was specified, update our routing table with it | ||||
|                     if !self.filter_node_info(routing_domain, sender_peer_info.signed_node_info()) { | ||||
|                     if !self.verify_node_info(routing_domain, sender_peer_info.signed_node_info()) { | ||||
|                         return Ok(NetworkResult::invalid_message( | ||||
|                             "sender peerinfo has invalid peer scope", | ||||
|                         )); | ||||
|   | ||||
| @@ -29,9 +29,9 @@ impl RPCProcessor { | ||||
|         let app_call_a = match kind { | ||||
|             RPCOperationKind::Answer(a) => match a.destructure() { | ||||
|                 RPCAnswerDetail::AppCallA(a) => a, | ||||
|                 _ => return Err(RPCError::invalid_format("not an appcall answer")), | ||||
|                 _ => return Ok(NetworkResult::invalid_message("not an appcall answer")), | ||||
|             }, | ||||
|             _ => return Err(RPCError::invalid_format("not an answer")), | ||||
|             _ => return Ok(NetworkResult::invalid_message("not an answer")), | ||||
|         }; | ||||
|  | ||||
|         let a_message = app_call_a.destructure(); | ||||
|   | ||||
| @@ -46,17 +46,17 @@ impl RPCProcessor { | ||||
|         let find_node_a = match kind { | ||||
|             RPCOperationKind::Answer(a) => match a.destructure() { | ||||
|                 RPCAnswerDetail::FindNodeA(a) => a, | ||||
|                 _ => return Err(RPCError::invalid_format("not a find_node answer")), | ||||
|                 _ => return Ok(NetworkResult::invalid_message("not a find_node answer")), | ||||
|             }, | ||||
|             _ => return Err(RPCError::invalid_format("not an answer")), | ||||
|             _ => return Ok(NetworkResult::invalid_message("not an answer")), | ||||
|         }; | ||||
|  | ||||
|         // Verify peers are in the correct peer scope | ||||
|         let peers = find_node_a.destructure(); | ||||
|  | ||||
|         for peer_info in &peers { | ||||
|             if !self.filter_node_info(RoutingDomain::PublicInternet, peer_info.signed_node_info()) { | ||||
|                 return Err(RPCError::invalid_format( | ||||
|             if !self.verify_node_info(RoutingDomain::PublicInternet, peer_info.signed_node_info()) { | ||||
|                 return Ok(NetworkResult::invalid_message( | ||||
|                     "find_node response has invalid peer scope", | ||||
|                 )); | ||||
|             } | ||||
|   | ||||
| @@ -24,32 +24,32 @@ impl RPCProcessor { | ||||
|         last_descriptor: Option<SignedValueDescriptor>, | ||||
|     ) -> Result<NetworkResult<Answer<GetValueAnswer>>, RPCError> { | ||||
|         // Ensure destination never has a private route | ||||
|         if matches!( | ||||
|             dest, | ||||
|             Destination::PrivateRoute { | ||||
|                 private_route: _, | ||||
|                 safety_selection: _ | ||||
|             } | ||||
|         ) { | ||||
|         // and get the target noderef so we can validate the response | ||||
|         let Some(target) = dest.target() else { | ||||
|             return Err(RPCError::internal( | ||||
|                 "Never send get value requests over private routes", | ||||
|                 "Never send set value requests over private routes", | ||||
|             )); | ||||
|         } | ||||
|         }; | ||||
|  | ||||
|         // Get the target node id | ||||
|         let Some(vcrypto) = self.crypto.get(key.kind) else { | ||||
|             return Err(RPCError::internal("unsupported cryptosystem")); | ||||
|         }; | ||||
|         let Some(target_node_id) = target.node_ids().get(key.kind) else { | ||||
|             return Err(RPCError::internal("No node id for crypto kind")); | ||||
|         }; | ||||
|  | ||||
|         // Send the getvalue question | ||||
|         let get_value_q = RPCOperationGetValueQ::new(key, subkey, last_descriptor.is_none()); | ||||
|         let question = RPCQuestion::new( | ||||
|             network_result_try!(self.get_destination_respond_to(&dest)?), | ||||
|             RPCQuestionDetail::GetValueQ(get_value_q), | ||||
|         ); | ||||
|         let Some(vcrypto) = self.crypto.get(key.kind) else { | ||||
|             return Err(RPCError::internal("unsupported cryptosystem")); | ||||
|         }; | ||||
|  | ||||
|         // Send the getvalue question | ||||
|         let question_context = QuestionContext::GetValue(ValidateGetValueContext { | ||||
|             last_descriptor, | ||||
|             subkey, | ||||
|             vcrypto, | ||||
|             vcrypto: vcrypto.clone(), | ||||
|         }); | ||||
|  | ||||
|         let waitable_reply = network_result_try!( | ||||
| @@ -68,13 +68,29 @@ impl RPCProcessor { | ||||
|         let get_value_a = match kind { | ||||
|             RPCOperationKind::Answer(a) => match a.destructure() { | ||||
|                 RPCAnswerDetail::GetValueA(a) => a, | ||||
|                 _ => return Err(RPCError::invalid_format("not a getvalue answer")), | ||||
|                 _ => return Ok(NetworkResult::invalid_message("not a getvalue answer")), | ||||
|             }, | ||||
|             _ => return Err(RPCError::invalid_format("not an answer")), | ||||
|             _ => return Ok(NetworkResult::invalid_message("not an answer")), | ||||
|         }; | ||||
|  | ||||
|         let (value, peers, descriptor) = get_value_a.destructure(); | ||||
|  | ||||
|         // Validate peers returned are, in fact, closer to the key than the node we sent this to | ||||
|         let valid = match self.verify_peers_closer(vcrypto, target_node_id, key, &peers) { | ||||
|             Ok(v) => v, | ||||
|             Err(e) => { | ||||
|                 if matches!(e, RPCError::Internal(_)) { | ||||
|                     return Err(e); | ||||
|                 } | ||||
|                 return Ok(NetworkResult::invalid_message( | ||||
|                     "missing cryptosystem in peers node ids", | ||||
|                 )); | ||||
|             } | ||||
|         }; | ||||
|         if !valid { | ||||
|             return Ok(NetworkResult::invalid_message("non-closer peers returned")); | ||||
|         } | ||||
|  | ||||
|         Ok(NetworkResult::value(Answer::new( | ||||
|             latency, | ||||
|             GetValueAnswer { | ||||
|   | ||||
| @@ -62,7 +62,7 @@ impl RPCProcessor { | ||||
|     ) -> Result<NetworkResult<()>, RPCError> { | ||||
|         // Make sure hop count makes sense | ||||
|         if next_private_route.hop_count as usize > self.unlocked_inner.max_route_hop_count { | ||||
|             return Err(RPCError::protocol( | ||||
|             return Ok(NetworkResult::invalid_message( | ||||
|                 "Private route hop count too high to process", | ||||
|             )); | ||||
|         } | ||||
| @@ -110,9 +110,9 @@ impl RPCProcessor { | ||||
|         // Now that things are valid, decrypt the routed operation with DEC(nonce, DH(the SR's public key, the PR's (or node's) secret) | ||||
|         // xxx: punish nodes that send messages that fail to decrypt eventually? How to do this for safety routes? | ||||
|         let node_id_secret = self.routing_table.node_id_secret_key(remote_sr_pubkey.kind); | ||||
|         let dh_secret = vcrypto | ||||
|             .cached_dh(&remote_sr_pubkey.value, &node_id_secret) | ||||
|             .map_err(RPCError::protocol)?; | ||||
|         let Ok(dh_secret) = vcrypto.cached_dh(&remote_sr_pubkey.value, &node_id_secret) else { | ||||
|             return Ok(NetworkResult::invalid_message("dh failed for remote safety route for safety routed operation")); | ||||
|         }; | ||||
|         let body = match vcrypto.decrypt_aead( | ||||
|             routed_operation.data(), | ||||
|             routed_operation.nonce(), | ||||
| @@ -183,19 +183,18 @@ impl RPCProcessor { | ||||
|  | ||||
|         // Now that things are valid, decrypt the routed operation with DEC(nonce, DH(the SR's public key, the PR's (or node's) secret) | ||||
|         // xxx: punish nodes that send messages that fail to decrypt eventually. How to do this for private routes? | ||||
|         let dh_secret = vcrypto | ||||
|             .cached_dh(&remote_sr_pubkey.value, &secret_key) | ||||
|             .map_err(RPCError::protocol)?; | ||||
|         let body = vcrypto | ||||
|         let Ok(dh_secret) = vcrypto.cached_dh(&remote_sr_pubkey.value, &secret_key) else { | ||||
|             return Ok(NetworkResult::invalid_message("dh failed for remote safety route for private routed operation")); | ||||
|         }; | ||||
|         let Ok(body) = vcrypto | ||||
|             .decrypt_aead( | ||||
|                 routed_operation.data(), | ||||
|                 routed_operation.nonce(), | ||||
|                 &dh_secret, | ||||
|                 None, | ||||
|             ) | ||||
|             .map_err(RPCError::map_internal( | ||||
|                 "decryption of routed operation failed", | ||||
|             ))?; | ||||
|             ) else { | ||||
|                 return Ok(NetworkResult::invalid_message("decryption of routed operation failed")); | ||||
|             }; | ||||
|  | ||||
|         // Pass message to RPC system | ||||
|         self.enqueue_private_routed_message( | ||||
| @@ -401,37 +400,48 @@ impl RPCProcessor { | ||||
|             SafetyRouteHops::Data(ref route_hop_data) => { | ||||
|                 // Decrypt the blob with DEC(nonce, DH(the SR's public key, this hop's secret) | ||||
|                 let node_id_secret = self.routing_table.node_id_secret_key(crypto_kind); | ||||
|                 let dh_secret = vcrypto | ||||
|                     .cached_dh(&safety_route.public_key.value, &node_id_secret) | ||||
|                     .map_err(RPCError::protocol)?; | ||||
|                 let mut dec_blob_data = vcrypto | ||||
|                 let Ok(dh_secret) = vcrypto | ||||
|                     .cached_dh(&safety_route.public_key.value, &node_id_secret) else { | ||||
|                     return Ok(NetworkResult::invalid_message("dh failed for safety route hop")); | ||||
|                 }; | ||||
|                 let Ok(mut dec_blob_data) = vcrypto | ||||
|                     .decrypt_aead( | ||||
|                         &route_hop_data.blob, | ||||
|                         &route_hop_data.nonce, | ||||
|                         &dh_secret, | ||||
|                         None, | ||||
|                     ) | ||||
|                     .map_err(RPCError::protocol)?; | ||||
|                     else { | ||||
|                     return Ok(NetworkResult::invalid_message("failed to decrypt route hop data for safety route hop")); | ||||
|                 }; | ||||
|  | ||||
|                 // See if this is last hop in safety route, if so, we're decoding a PrivateRoute not a RouteHop | ||||
|                 let Some(dec_blob_tag) = dec_blob_data.pop() else { | ||||
|                     return Ok(NetworkResult::invalid_message("no bytes in blob")); | ||||
|                 }; | ||||
|  | ||||
|                 let dec_blob_reader = RPCMessageData::new(dec_blob_data).get_reader()?; | ||||
|                 let Ok(dec_blob_reader) = RPCMessageData::new(dec_blob_data).get_reader() else { | ||||
|                     return Ok(NetworkResult::invalid_message("Failed to decode RPCMessageData from blob")); | ||||
|                 }; | ||||
|  | ||||
|                 // Decode the blob appropriately | ||||
|                 if dec_blob_tag == 1 { | ||||
|                     // PrivateRoute | ||||
|                     let private_route = { | ||||
|                         let pr_reader = dec_blob_reader | ||||
|                             .get_root::<veilid_capnp::private_route::Reader>() | ||||
|                             .map_err(RPCError::protocol)?; | ||||
|                         decode_private_route(&pr_reader)? | ||||
|                         let Ok(pr_reader) = dec_blob_reader | ||||
|                             .get_root::<veilid_capnp::private_route::Reader>() else { | ||||
|                             return Ok(NetworkResult::invalid_message("failed to get private route reader for blob")); | ||||
|                         }; | ||||
|                         let Ok(private_route) = decode_private_route(&pr_reader) else { | ||||
|                             return Ok(NetworkResult::invalid_message("failed to decode private route")); | ||||
|                         }; | ||||
|                         private_route | ||||
|                     }; | ||||
|                      | ||||
|                     // Validate the private route | ||||
|                     private_route.validate(self.crypto.clone()).map_err(RPCError::protocol)?; | ||||
|                     if let Err(_) = private_route.validate(self.crypto.clone()) { | ||||
|                         return Ok(NetworkResult::invalid_message("failed to validate private route")); | ||||
|                     } | ||||
|  | ||||
|                     // Switching from full safety route to private route first hop | ||||
|                     network_result_try!( | ||||
| @@ -445,14 +455,20 @@ impl RPCProcessor { | ||||
|                 } else if dec_blob_tag == 0 { | ||||
|                     // RouteHop | ||||
|                     let route_hop = { | ||||
|                         let rh_reader = dec_blob_reader | ||||
|                             .get_root::<veilid_capnp::route_hop::Reader>() | ||||
|                             .map_err(RPCError::protocol)?; | ||||
|                         decode_route_hop(&rh_reader)? | ||||
|                         let Ok(rh_reader) = dec_blob_reader | ||||
|                             .get_root::<veilid_capnp::route_hop::Reader>() else { | ||||
|                             return Ok(NetworkResult::invalid_message("failed to get route hop reader for blob")); | ||||
|                         }; | ||||
|                         let Ok(route_hop) = decode_route_hop(&rh_reader) else { | ||||
|                             return Ok(NetworkResult::invalid_message("failed to decode route hop")); | ||||
|                         }; | ||||
|                         route_hop | ||||
|                     }; | ||||
|  | ||||
|                     // Validate the route hop | ||||
|                     route_hop.validate(self.crypto.clone()).map_err(RPCError::protocol)?; | ||||
|                     if let Err(_) = route_hop.validate(self.crypto.clone()) { | ||||
|                         return Ok(NetworkResult::invalid_message("failed to validate route hop")); | ||||
|                     } | ||||
|  | ||||
|                     // Continue the full safety route with another hop | ||||
|                     network_result_try!( | ||||
|   | ||||
| @@ -14,7 +14,6 @@ impl RPCProcessor { | ||||
|     /// Because this leaks information about the identity of the node itself, | ||||
|     /// replying to this request received over a private route will leak | ||||
|     /// the identity of the node and defeat the private route. | ||||
|     #[instrument(level = "trace", skip(self), ret, err)] | ||||
|     pub async fn rpc_call_set_value( | ||||
|         self, | ||||
|         dest: Destination, | ||||
| @@ -25,18 +24,22 @@ impl RPCProcessor { | ||||
|         send_descriptor: bool, | ||||
|     ) -> Result<NetworkResult<Answer<SetValueAnswer>>, RPCError> { | ||||
|         // Ensure destination never has a private route | ||||
|         if matches!( | ||||
|             dest, | ||||
|             Destination::PrivateRoute { | ||||
|                 private_route: _, | ||||
|                 safety_selection: _ | ||||
|             } | ||||
|         ) { | ||||
|         // and get the target noderef so we can validate the response | ||||
|         let Some(target) = dest.target() else { | ||||
|             return Err(RPCError::internal( | ||||
|                 "Never send set value requests over private routes", | ||||
|             )); | ||||
|         } | ||||
|         }; | ||||
|  | ||||
|         // Get the target node id | ||||
|         let Some(vcrypto) = self.crypto.get(key.kind) else { | ||||
|             return Err(RPCError::internal("unsupported cryptosystem")); | ||||
|         }; | ||||
|         let Some(target_node_id) = target.node_ids().get(key.kind) else { | ||||
|             return Err(RPCError::internal("No node id for crypto kind")); | ||||
|         }; | ||||
|  | ||||
|         // Send the setvalue question | ||||
|         let set_value_q = RPCOperationSetValueQ::new( | ||||
|             key, | ||||
|             subkey, | ||||
| @@ -51,17 +54,11 @@ impl RPCProcessor { | ||||
|             network_result_try!(self.get_destination_respond_to(&dest)?), | ||||
|             RPCQuestionDetail::SetValueQ(set_value_q), | ||||
|         ); | ||||
|         let Some(vcrypto) = self.crypto.get(key.kind) else { | ||||
|             return Err(RPCError::internal("unsupported cryptosystem")); | ||||
|         }; | ||||
|  | ||||
|         // Send the setvalue question | ||||
|         let question_context = QuestionContext::SetValue(ValidateSetValueContext { | ||||
|             descriptor, | ||||
|             subkey, | ||||
|             vcrypto, | ||||
|             vcrypto: vcrypto.clone(), | ||||
|         }); | ||||
|  | ||||
|         let waitable_reply = network_result_try!( | ||||
|             self.question(dest, question, Some(question_context)) | ||||
|                 .await? | ||||
| @@ -78,13 +75,29 @@ impl RPCProcessor { | ||||
|         let set_value_a = match kind { | ||||
|             RPCOperationKind::Answer(a) => match a.destructure() { | ||||
|                 RPCAnswerDetail::SetValueA(a) => a, | ||||
|                 _ => return Err(RPCError::invalid_format("not a setvalue answer")), | ||||
|                 _ => return Ok(NetworkResult::invalid_message("not a setvalue answer")), | ||||
|             }, | ||||
|             _ => return Err(RPCError::invalid_format("not an answer")), | ||||
|             _ => return Ok(NetworkResult::invalid_message("not an answer")), | ||||
|         }; | ||||
|  | ||||
|         let (set, value, peers) = set_value_a.destructure(); | ||||
|  | ||||
|         // Validate peers returned are, in fact, closer to the key than the node we sent this to | ||||
|         let valid = match self.verify_peers_closer(vcrypto, target_node_id, key, &peers) { | ||||
|             Ok(v) => v, | ||||
|             Err(e) => { | ||||
|                 if matches!(e, RPCError::Internal(_)) { | ||||
|                     return Err(e); | ||||
|                 } | ||||
|                 return Ok(NetworkResult::invalid_message( | ||||
|                     "missing cryptosystem in peers node ids", | ||||
|                 )); | ||||
|             } | ||||
|         }; | ||||
|         if !valid { | ||||
|             return Ok(NetworkResult::invalid_message("non-closer peers returned")); | ||||
|         } | ||||
|  | ||||
|         Ok(NetworkResult::value(Answer::new( | ||||
|             latency, | ||||
|             SetValueAnswer { set, value, peers }, | ||||
|   | ||||
| @@ -119,9 +119,9 @@ impl RPCProcessor { | ||||
|         let status_a = match kind { | ||||
|             RPCOperationKind::Answer(a) => match a.destructure() { | ||||
|                 RPCAnswerDetail::StatusA(a) => a, | ||||
|                 _ => return Err(RPCError::invalid_format("not a status answer")), | ||||
|                 _ => return Ok(NetworkResult::invalid_message("not a status answer")), | ||||
|             }, | ||||
|             _ => return Err(RPCError::invalid_format("not an answer")), | ||||
|             _ => return Ok(NetworkResult::invalid_message("not an answer")), | ||||
|         }; | ||||
|         let (a_node_status, sender_info) = status_a.destructure(); | ||||
|  | ||||
|   | ||||
| @@ -1,7 +1,7 @@ | ||||
| use super::*; | ||||
|  | ||||
| /// The context of the do_get_value operation | ||||
| struct DoGetValueContext { | ||||
| /// The context of the outbound_get_value operation | ||||
| struct OutboundGetValueContext { | ||||
|     /// The latest value of the subkey, may be the value passed in | ||||
|     pub value: Option<SignedValueData>, | ||||
|     /// The consensus count for the value we have received | ||||
| @@ -42,7 +42,7 @@ impl StorageManager { | ||||
|         } else { | ||||
|             None | ||||
|         }; | ||||
|         let context = Arc::new(Mutex::new(DoGetValueContext { | ||||
|         let context = Arc::new(Mutex::new(OutboundGetValueContext { | ||||
|             value: last_subkey_result.value, | ||||
|             value_count: 0, | ||||
|             descriptor: last_subkey_result.descriptor.clone(), | ||||
|   | ||||
| @@ -1,7 +1,7 @@ | ||||
| use super::*; | ||||
|  | ||||
| /// The context of the do_get_value operation | ||||
| struct DoSetValueContext { | ||||
| /// The context of the outbound_set_value operation | ||||
| struct OutboundSetValueContext { | ||||
|     /// The latest value of the subkey, may be the value passed in | ||||
|     pub value: SignedValueData, | ||||
|     /// The consensus count for the value we have received | ||||
| @@ -37,7 +37,7 @@ impl StorageManager { | ||||
|  | ||||
|         // Make do-set-value answer context | ||||
|         let schema = descriptor.schema()?; | ||||
|         let context = Arc::new(Mutex::new(DoSetValueContext { | ||||
|         let context = Arc::new(Mutex::new(OutboundSetValueContext { | ||||
|             value, | ||||
|             value_count: 0, | ||||
|             schema, | ||||
|   | ||||
		Reference in New Issue
	
	Block a user