Merge branch 'issue-326' into 'main'

Fix for propagation of TryAgain conditions

See merge request veilid/veilid!229
This commit is contained in:
Christien Rioux 2023-10-21 23:25:39 +00:00
commit ba7bdf12a8
51 changed files with 958 additions and 689 deletions

23
Cargo.lock generated
View File

@ -1241,6 +1241,16 @@ dependencies = [
"subtle", "subtle",
] ]
[[package]]
name = "ctor"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "37e366bff8cd32dd8754b0991fb66b279dc48f598c3a18914852a6673deef583"
dependencies = [
"quote",
"syn 2.0.38",
]
[[package]] [[package]]
name = "ctrlc" name = "ctrlc"
version = "3.4.1" version = "3.4.1"
@ -2684,6 +2694,15 @@ version = "0.2.149"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a08173bc88b7955d1b3145aa561539096c421ac8debde8cbc3612ec635fee29b" checksum = "a08173bc88b7955d1b3145aa561539096c421ac8debde8cbc3612ec635fee29b"
[[package]]
name = "libc-print"
version = "0.1.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c17f111e2175c779daaf5e89fe3a3b0776b0adec218bc1159c56e4d3f58032f5"
dependencies = [
"libc",
]
[[package]] [[package]]
name = "libloading" name = "libloading"
version = "0.7.4" version = "0.7.4"
@ -5518,18 +5537,22 @@ name = "veilid-flutter"
version = "0.2.4" version = "0.2.4"
dependencies = [ dependencies = [
"allo-isolate", "allo-isolate",
"android_log-sys 0.3.1",
"async-std", "async-std",
"backtrace", "backtrace",
"cfg-if 1.0.0", "cfg-if 1.0.0",
"ctor",
"data-encoding", "data-encoding",
"ffi-support", "ffi-support",
"futures-util", "futures-util",
"hostname", "hostname",
"jni", "jni",
"lazy_static", "lazy_static",
"libc-print",
"opentelemetry", "opentelemetry",
"opentelemetry-otlp", "opentelemetry-otlp",
"opentelemetry-semantic-conventions", "opentelemetry-semantic-conventions",
"oslog",
"paranoid-android", "paranoid-android",
"parking_lot 0.12.1", "parking_lot 0.12.1",
"serde", "serde",

View File

@ -50,7 +50,11 @@ impl Network {
let mut add = false; let mut add = false;
if let Some(edi) = existing_dial_info.get(&(pt, at)) { if let Some(edi) = existing_dial_info.get(&(pt, at)) {
if did.class <= edi.class { // Is the dial info class better than our existing dial info?
// Or is the new dial info the same class, but different? Only change if things are different.
if did.class < edi.class
|| (did.class == edi.class && did.dial_info != edi.dial_info)
{
// Better or same dial info class was found, clear existing dialinfo for this pt/at pair // Better or same dial info class was found, clear existing dialinfo for this pt/at pair
// Only keep one dial info per protocol/address type combination // Only keep one dial info per protocol/address type combination
clear = true; clear = true;

View File

@ -924,7 +924,7 @@ impl BucketEntry {
impl Drop for BucketEntry { impl Drop for BucketEntry {
fn drop(&mut self) { fn drop(&mut self) {
if self.ref_count.load(Ordering::Relaxed) != 0 { if self.ref_count.load(Ordering::Acquire) != 0 {
#[cfg(feature = "tracking")] #[cfg(feature = "tracking")]
{ {
println!("NodeRef Tracking"); println!("NodeRef Tracking");

View File

@ -379,7 +379,7 @@ impl NodeRef {
entry: Arc<BucketEntry>, entry: Arc<BucketEntry>,
filter: Option<NodeRefFilter>, filter: Option<NodeRefFilter>,
) -> Self { ) -> Self {
entry.ref_count.fetch_add(1u32, Ordering::Relaxed); entry.ref_count.fetch_add(1u32, Ordering::AcqRel);
Self { Self {
common: NodeRefBaseCommon { common: NodeRefBaseCommon {
@ -438,7 +438,7 @@ impl Clone for NodeRef {
self.common self.common
.entry .entry
.ref_count .ref_count
.fetch_add(1u32, Ordering::Relaxed); .fetch_add(1u32, Ordering::AcqRel);
Self { Self {
common: NodeRefBaseCommon { common: NodeRefBaseCommon {
@ -479,7 +479,7 @@ impl Drop for NodeRef {
.common .common
.entry .entry
.ref_count .ref_count
.fetch_sub(1u32, Ordering::Relaxed) .fetch_sub(1u32, Ordering::AcqRel)
- 1; - 1;
if new_ref_count == 0 { if new_ref_count == 0 {
// get node ids with inner unlocked because nothing could be referencing this entry now // get node ids with inner unlocked because nothing could be referencing this entry now

View File

@ -151,20 +151,21 @@ impl RouteSpecStore {
} }
/// Purge the route spec store /// Purge the route spec store
pub async fn purge(&self) -> EyreResult<()> { pub async fn purge(&self) -> VeilidAPIResult<()> {
{ {
let inner = &mut *self.inner.lock(); let inner = &mut *self.inner.lock();
inner.content = Default::default(); inner.content = Default::default();
inner.cache = Default::default(); inner.cache = Default::default();
} }
self.save().await self.save().await.map_err(VeilidAPIError::internal)
} }
/// Create a new route /// Create a new route
/// Prefers nodes that are not currently in use by another route /// Prefers nodes that are not currently in use by another route
/// The route is not yet tested for its reachability /// The route is not yet tested for its reachability
/// Returns None if no route could be allocated at this time /// Returns Err(VeilidAPIError::TryAgain) if no route could be allocated at this time
/// Returns Some route id string /// Returns other errors on failure
/// Returns Ok(route id string) on success
#[instrument(level = "trace", skip(self), ret, err)] #[instrument(level = "trace", skip(self), ret, err)]
pub fn allocate_route( pub fn allocate_route(
&self, &self,
@ -174,7 +175,7 @@ impl RouteSpecStore {
hop_count: usize, hop_count: usize,
directions: DirectionSet, directions: DirectionSet,
avoid_nodes: &[TypedKey], avoid_nodes: &[TypedKey],
) -> EyreResult<Option<RouteId>> { ) -> VeilidAPIResult<RouteId> {
let inner = &mut *self.inner.lock(); let inner = &mut *self.inner.lock();
let routing_table = self.unlocked_inner.routing_table.clone(); let routing_table = self.unlocked_inner.routing_table.clone();
let rti = &mut *routing_table.inner.write(); let rti = &mut *routing_table.inner.write();
@ -203,21 +204,30 @@ impl RouteSpecStore {
hop_count: usize, hop_count: usize,
directions: DirectionSet, directions: DirectionSet,
avoid_nodes: &[TypedKey], avoid_nodes: &[TypedKey],
) -> EyreResult<Option<RouteId>> { ) -> VeilidAPIResult<RouteId> {
use core::cmp::Ordering; use core::cmp::Ordering;
if hop_count < 1 { if hop_count < 1 {
bail!("Not allocating route less than one hop in length"); apibail_invalid_argument!(
"Not allocating route less than one hop in length",
"hop_count",
hop_count
);
} }
if hop_count > self.unlocked_inner.max_route_hop_count { if hop_count > self.unlocked_inner.max_route_hop_count {
bail!("Not allocating route longer than max route hop count"); apibail_invalid_argument!(
"Not allocating route longer than max route hop count",
"hop_count",
hop_count
);
} }
// Ensure we have a valid network class so our peer info is useful // Ensure we have a valid network class so our peer info is useful
if !rti.has_valid_network_class(RoutingDomain::PublicInternet) { if !rti.has_valid_network_class(RoutingDomain::PublicInternet) {
log_rtab!(debug "unable to allocate route until we have a valid PublicInternet network class"); apibail_try_again!(
return Ok(None); "unable to allocate route until we have a valid PublicInternet network class"
);
}; };
// Get our peer info // Get our peer info
@ -369,8 +379,7 @@ impl RouteSpecStore {
// If we couldn't find enough nodes, wait until we have more nodes in the routing table // If we couldn't find enough nodes, wait until we have more nodes in the routing table
if nodes.len() < hop_count { if nodes.len() < hop_count {
log_rtab!(debug "not enough nodes to construct route at this time"); apibail_try_again!("not enough nodes to construct route at this time");
return Ok(None);
} }
// Get peer info for everything // Get peer info for everything
@ -522,8 +531,7 @@ impl RouteSpecStore {
} }
} }
if route_nodes.is_empty() { if route_nodes.is_empty() {
log_rtab!(debug "unable to find unique route at this time"); apibail_try_again!("unable to find unique route at this time");
return Ok(None);
} }
drop(perm_func); drop(perm_func);
@ -579,7 +587,7 @@ impl RouteSpecStore {
// Keep route in spec store // Keep route in spec store
inner.content.add_detail(id, rssd); inner.content.add_detail(id, rssd);
Ok(Some(id)) Ok(id)
} }
/// validate data using a private route's key and signature chain /// validate data using a private route's key and signature chain
@ -651,17 +659,21 @@ impl RouteSpecStore {
feature = "verbose-tracing", feature = "verbose-tracing",
instrument(level = "trace", skip(self), ret, err) instrument(level = "trace", skip(self), ret, err)
)] )]
async fn test_allocated_route(&self, private_route_id: RouteId) -> EyreResult<bool> { async fn test_allocated_route(&self, private_route_id: RouteId) -> VeilidAPIResult<bool> {
// Make loopback route to test with // Make loopback route to test with
let dest = { let dest = {
// Get the best private route for this id // Get the best private route for this id
let (key, hop_count) = { let (key, hop_count) = {
let inner = &mut *self.inner.lock(); let inner = &mut *self.inner.lock();
let Some(rssd) = inner.content.get_detail(&private_route_id) else { let Some(rssd) = inner.content.get_detail(&private_route_id) else {
bail!("route id not allocated"); apibail_invalid_argument!(
"route id not allocated",
"private_route_id",
private_route_id
);
}; };
let Some(key) = rssd.get_best_route_set_key() else { let Some(key) = rssd.get_best_route_set_key() else {
bail!("no best key to test allocated route"); apibail_internal!("no best key to test allocated route");
}; };
// Match the private route's hop length for safety route length // Match the private route's hop length for safety route length
let hop_count = rssd.hop_count(); let hop_count = rssd.hop_count();
@ -703,12 +715,12 @@ impl RouteSpecStore {
} }
#[instrument(level = "trace", skip(self), ret, err)] #[instrument(level = "trace", skip(self), ret, err)]
async fn test_remote_route(&self, private_route_id: RouteId) -> EyreResult<bool> { async fn test_remote_route(&self, private_route_id: RouteId) -> VeilidAPIResult<bool> {
// Make private route test // Make private route test
let dest = { let dest = {
// Get the route to test // Get the route to test
let Some(private_route) = self.best_remote_private_route(&private_route_id) else { let Some(private_route) = self.best_remote_private_route(&private_route_id) else {
bail!("no best key to test remote route"); apibail_internal!("no best key to test remote route");
}; };
// Always test routes with safety routes that are more likely to succeed // Always test routes with safety routes that are more likely to succeed
@ -777,7 +789,7 @@ impl RouteSpecStore {
feature = "verbose-tracing", feature = "verbose-tracing",
instrument(level = "trace", skip(self), ret, err) instrument(level = "trace", skip(self), ret, err)
)] )]
pub async fn test_route(&self, id: RouteId) -> EyreResult<bool> { pub async fn test_route(&self, id: RouteId) -> VeilidAPIResult<bool> {
let is_remote = self.is_route_id_remote(&id); let is_remote = self.is_route_id_remote(&id);
if is_remote { if is_remote {
self.test_remote_route(id).await self.test_remote_route(id).await
@ -904,13 +916,14 @@ impl RouteSpecStore {
} }
/// Compiles a safety route to the private route, with caching /// Compiles a safety route to the private route, with caching
/// Returns an Err() if the parameters are wrong /// Returns Err(VeilidAPIError::TryAgain) if no allocation could happen at this time (not an error)
/// Returns Ok(None) if no allocation could happen at this time (not an error) /// Returns other Err() if the parameters are wrong
/// Returns Ok(compiled route) on success
pub fn compile_safety_route( pub fn compile_safety_route(
&self, &self,
safety_selection: SafetySelection, safety_selection: SafetySelection,
mut private_route: PrivateRoute, mut private_route: PrivateRoute,
) -> EyreResult<Option<CompiledRoute>> { ) -> VeilidAPIResult<CompiledRoute> {
// let profile_start_ts = get_timestamp(); // let profile_start_ts = get_timestamp();
let inner = &mut *self.inner.lock(); let inner = &mut *self.inner.lock();
let routing_table = self.unlocked_inner.routing_table.clone(); let routing_table = self.unlocked_inner.routing_table.clone();
@ -920,7 +933,7 @@ impl RouteSpecStore {
let crypto_kind = private_route.crypto_kind(); let crypto_kind = private_route.crypto_kind();
let crypto = routing_table.crypto(); let crypto = routing_table.crypto();
let Some(vcrypto) = crypto.get(crypto_kind) else { let Some(vcrypto) = crypto.get(crypto_kind) else {
bail!("crypto not supported for route"); apibail_generic!("crypto not supported for route");
}; };
let pr_pubkey = private_route.public_key.value; let pr_pubkey = private_route.public_key.value;
let pr_hopcount = private_route.hop_count as usize; let pr_hopcount = private_route.hop_count as usize;
@ -928,7 +941,11 @@ impl RouteSpecStore {
// Check private route hop count isn't larger than the max route hop count plus one for the 'first hop' header // Check private route hop count isn't larger than the max route hop count plus one for the 'first hop' header
if pr_hopcount > (max_route_hop_count + 1) { if pr_hopcount > (max_route_hop_count + 1) {
bail!("private route hop count too long"); apibail_invalid_argument!(
"private route hop count too long",
"private_route.hop_count",
pr_hopcount
);
} }
// See if we are using a safety route, if not, short circuit this operation // See if we are using a safety route, if not, short circuit this operation
let safety_spec = match safety_selection { let safety_spec = match safety_selection {
@ -937,24 +954,26 @@ impl RouteSpecStore {
// Safety route stub with the node's public key as the safety route key since it's the 0th hop // Safety route stub with the node's public key as the safety route key since it's the 0th hop
SafetySelection::Unsafe(sequencing) => { SafetySelection::Unsafe(sequencing) => {
let Some(pr_first_hop_node) = private_route.pop_first_hop() else { let Some(pr_first_hop_node) = private_route.pop_first_hop() else {
bail!("compiled private route should have first hop"); apibail_generic!("compiled private route should have first hop");
}; };
let opt_first_hop = match pr_first_hop_node { let opt_first_hop = match pr_first_hop_node {
RouteNode::NodeId(id) => { RouteNode::NodeId(id) => rti
rti.lookup_node_ref(routing_table.clone(), TypedKey::new(crypto_kind, id))? .lookup_node_ref(routing_table.clone(), TypedKey::new(crypto_kind, id))
} .map_err(VeilidAPIError::internal)?,
RouteNode::PeerInfo(pi) => Some(rti.register_node_with_peer_info( RouteNode::PeerInfo(pi) => Some(
rti.register_node_with_peer_info(
routing_table.clone(), routing_table.clone(),
RoutingDomain::PublicInternet, RoutingDomain::PublicInternet,
*pi, *pi,
false, false,
)?), )
.map_err(VeilidAPIError::internal)?,
),
}; };
if opt_first_hop.is_none() { if opt_first_hop.is_none() {
// Can't reach this private route any more // Can't reach this private route any more
log_rtab!(debug "can't reach private route any more"); apibail_generic!("can't reach private route any more");
return Ok(None);
} }
let mut first_hop = opt_first_hop.unwrap(); let mut first_hop = opt_first_hop.unwrap();
@ -963,14 +982,14 @@ impl RouteSpecStore {
// Return the compiled safety route // Return the compiled safety route
//println!("compile_safety_route profile (stub): {} us", (get_timestamp() - profile_start_ts)); //println!("compile_safety_route profile (stub): {} us", (get_timestamp() - profile_start_ts));
return Ok(Some(CompiledRoute { return Ok(CompiledRoute {
safety_route: SafetyRoute::new_stub( safety_route: SafetyRoute::new_stub(
routing_table.node_id(crypto_kind), routing_table.node_id(crypto_kind),
private_route, private_route,
), ),
secret: routing_table.node_id_secret_key(crypto_kind), secret: routing_table.node_id_secret_key(crypto_kind),
first_hop, first_hop,
})); });
} }
}; };
@ -983,9 +1002,9 @@ impl RouteSpecStore {
pr_pubkey pr_pubkey
} else { } else {
let Some(avoid_node_id) = private_route.first_hop_node_id() else { let Some(avoid_node_id) = private_route.first_hop_node_id() else {
bail!("compiled private route should have first hop"); apibail_generic!("compiled private route should have first hop");
}; };
let Some(sr_pubkey) = self.get_route_for_safety_spec_inner( self.get_route_for_safety_spec_inner(
inner, inner,
rti, rti,
crypto_kind, crypto_kind,
@ -993,22 +1012,17 @@ impl RouteSpecStore {
Direction::Outbound.into(), Direction::Outbound.into(),
&[avoid_node_id], &[avoid_node_id],
)? )?
else {
// No safety route could be found for this spec
return Ok(None);
};
sr_pubkey
}; };
// Look up a few things from the safety route detail we want for the compiled route and don't borrow inner // Look up a few things from the safety route detail we want for the compiled route and don't borrow inner
let Some(safety_route_id) = inner.content.get_id_by_key(&sr_pubkey) else { let Some(safety_route_id) = inner.content.get_id_by_key(&sr_pubkey) else {
bail!("route id missing"); apibail_generic!("safety route id missing");
}; };
let Some(safety_rssd) = inner.content.get_detail(&safety_route_id) else { let Some(safety_rssd) = inner.content.get_detail(&safety_route_id) else {
bail!("route set detail missing"); apibail_internal!("safety route set detail missing");
}; };
let Some(safety_rsd) = safety_rssd.get_route_by_key(&sr_pubkey) else { let Some(safety_rsd) = safety_rssd.get_route_by_key(&sr_pubkey) else {
bail!("route detail missing"); apibail_internal!("safety route detail missing");
}; };
// We can optimize the peer info in this safety route if it has been successfully // We can optimize the peer info in this safety route if it has been successfully
@ -1040,7 +1054,7 @@ impl RouteSpecStore {
}; };
// Return compiled route // Return compiled route
//println!("compile_safety_route profile (cached): {} us", (get_timestamp() - profile_start_ts)); //println!("compile_safety_route profile (cached): {} us", (get_timestamp() - profile_start_ts));
return Ok(Some(compiled_route)); return Ok(compiled_route);
} }
} }
@ -1071,10 +1085,10 @@ impl RouteSpecStore {
// Encrypt the previous blob ENC(nonce, DH(PKhop,SKsr)) // Encrypt the previous blob ENC(nonce, DH(PKhop,SKsr))
let dh_secret = vcrypto let dh_secret = vcrypto
.cached_dh(&safety_rsd.hops[h], &safety_rsd.secret_key) .cached_dh(&safety_rsd.hops[h], &safety_rsd.secret_key)
.wrap_err("dh failed")?; .map_err(VeilidAPIError::internal)?;
let enc_msg_data = vcrypto let enc_msg_data = vcrypto
.encrypt_aead(blob_data.as_slice(), &nonce, &dh_secret, None) .encrypt_aead(blob_data.as_slice(), &nonce, &dh_secret, None)
.wrap_err("encryption failed")?; .map_err(VeilidAPIError::internal)?;
// Make route hop data // Make route hop data
let route_hop_data = RouteHopData { let route_hop_data = RouteHopData {
@ -1098,7 +1112,7 @@ impl RouteSpecStore {
}) })
.flatten(); .flatten();
if pi.is_none() { if pi.is_none() {
bail!("peer info should exist for route but doesn't"); apibail_internal!("peer info should exist for route but doesn't");
} }
RouteNode::PeerInfo(Box::new(pi.unwrap())) RouteNode::PeerInfo(Box::new(pi.unwrap()))
}, },
@ -1123,10 +1137,10 @@ impl RouteSpecStore {
// Encode first RouteHopData // Encode first RouteHopData
let dh_secret = vcrypto let dh_secret = vcrypto
.cached_dh(&safety_rsd.hops[0], &safety_rsd.secret_key) .cached_dh(&safety_rsd.hops[0], &safety_rsd.secret_key)
.map_err(RPCError::map_internal("dh failed"))?; .map_err(VeilidAPIError::internal)?;
let enc_msg_data = vcrypto let enc_msg_data = vcrypto
.encrypt_aead(blob_data.as_slice(), &nonce, &dh_secret, None) .encrypt_aead(blob_data.as_slice(), &nonce, &dh_secret, None)
.map_err(RPCError::map_internal("encryption failed"))?; .map_err(VeilidAPIError::internal)?;
let route_hop_data = RouteHopData { let route_hop_data = RouteHopData {
nonce, nonce,
@ -1159,7 +1173,7 @@ impl RouteSpecStore {
// Return compiled route // Return compiled route
//println!("compile_safety_route profile (uncached): {} us", (get_timestamp() - profile_start_ts)); //println!("compile_safety_route profile (uncached): {} us", (get_timestamp() - profile_start_ts));
Ok(Some(compiled_route)) Ok(compiled_route)
} }
/// Get an allocated route that matches a particular safety spec /// Get an allocated route that matches a particular safety spec
@ -1175,14 +1189,22 @@ impl RouteSpecStore {
safety_spec: &SafetySpec, safety_spec: &SafetySpec,
direction: DirectionSet, direction: DirectionSet,
avoid_nodes: &[TypedKey], avoid_nodes: &[TypedKey],
) -> EyreResult<Option<PublicKey>> { ) -> VeilidAPIResult<PublicKey> {
// Ensure the total hop count isn't too long for our config // Ensure the total hop count isn't too long for our config
let max_route_hop_count = self.unlocked_inner.max_route_hop_count; let max_route_hop_count = self.unlocked_inner.max_route_hop_count;
if safety_spec.hop_count == 0 { if safety_spec.hop_count == 0 {
bail!("safety route hop count is zero"); apibail_invalid_argument!(
"safety route hop count is zero",
"safety_spec.hop_count",
safety_spec.hop_count
);
} }
if safety_spec.hop_count > max_route_hop_count { if safety_spec.hop_count > max_route_hop_count {
bail!("safety route hop count too long"); apibail_invalid_argument!(
"safety route hop count too long",
"safety_spec.hop_count",
safety_spec.hop_count
);
} }
// See if the preferred route is here // See if the preferred route is here
@ -1192,7 +1214,7 @@ impl RouteSpecStore {
if let Some(preferred_key) = preferred_rssd.get_route_set_keys().get(crypto_kind) { if let Some(preferred_key) = preferred_rssd.get_route_set_keys().get(crypto_kind) {
// Only use the preferred route if it doesn't contain the avoid nodes // Only use the preferred route if it doesn't contain the avoid nodes
if !preferred_rssd.contains_nodes(avoid_nodes) { if !preferred_rssd.contains_nodes(avoid_nodes) {
return Ok(Some(preferred_key.value)); return Ok(preferred_key.value);
} }
} }
} }
@ -1213,8 +1235,7 @@ impl RouteSpecStore {
sr_route_id sr_route_id
} else { } else {
// No route found, gotta allocate one // No route found, gotta allocate one
let Some(sr_route_id) = self self.allocate_route_inner(
.allocate_route_inner(
inner, inner,
rti, rti,
&[crypto_kind], &[crypto_kind],
@ -1223,12 +1244,7 @@ impl RouteSpecStore {
safety_spec.hop_count, safety_spec.hop_count,
direction, direction,
avoid_nodes, avoid_nodes,
) )?
.map_err(RPCError::internal)?
else {
return Ok(None);
};
sr_route_id
}; };
let sr_pubkey = inner let sr_pubkey = inner
@ -1240,7 +1256,7 @@ impl RouteSpecStore {
.unwrap() .unwrap()
.value; .value;
Ok(Some(sr_pubkey)) Ok(sr_pubkey)
} }
/// Get a private route to use for the answer to question /// Get a private route to use for the answer to question
@ -1253,7 +1269,7 @@ impl RouteSpecStore {
crypto_kind: CryptoKind, crypto_kind: CryptoKind,
safety_spec: &SafetySpec, safety_spec: &SafetySpec,
avoid_nodes: &[TypedKey], avoid_nodes: &[TypedKey],
) -> EyreResult<Option<PublicKey>> { ) -> VeilidAPIResult<PublicKey> {
let inner = &mut *self.inner.lock(); let inner = &mut *self.inner.lock();
let routing_table = self.unlocked_inner.routing_table.clone(); let routing_table = self.unlocked_inner.routing_table.clone();
let rti = &mut *routing_table.inner.write(); let rti = &mut *routing_table.inner.write();
@ -1273,22 +1289,24 @@ impl RouteSpecStore {
key: &PublicKey, key: &PublicKey,
rsd: &RouteSpecDetail, rsd: &RouteSpecDetail,
optimized: bool, optimized: bool,
) -> EyreResult<PrivateRoute> { ) -> VeilidAPIResult<PrivateRoute> {
let routing_table = self.unlocked_inner.routing_table.clone(); let routing_table = self.unlocked_inner.routing_table.clone();
let rti = &*routing_table.inner.read(); let rti = &*routing_table.inner.read();
// Ensure we get the crypto for it // Ensure we get the crypto for it
let crypto = routing_table.network_manager().crypto(); let crypto = routing_table.network_manager().crypto();
let Some(vcrypto) = crypto.get(rsd.crypto_kind) else { let Some(vcrypto) = crypto.get(rsd.crypto_kind) else {
bail!("crypto not supported for route"); apibail_invalid_argument!(
"crypto not supported for route",
"rsd.crypto_kind",
rsd.crypto_kind
);
}; };
// Ensure our network class is valid before attempting to assemble any routes // Ensure our network class is valid before attempting to assemble any routes
if !rti.has_valid_network_class(RoutingDomain::PublicInternet) { if !rti.has_valid_network_class(RoutingDomain::PublicInternet) {
let peer_info = rti.get_own_peer_info(RoutingDomain::PublicInternet); apibail_try_again!(
bail!( "unable to assemble route until we have a valid PublicInternet network class"
"can't make private routes until our node info is valid: {:?}",
peer_info
); );
} }
@ -1296,7 +1314,11 @@ impl RouteSpecStore {
let mut route_hop = RouteHop { let mut route_hop = RouteHop {
node: if optimized { node: if optimized {
let Some(node_id) = routing_table.node_ids().get(rsd.crypto_kind) else { let Some(node_id) = routing_table.node_ids().get(rsd.crypto_kind) else {
bail!("missing node id for crypto kind"); apibail_invalid_argument!(
"missing node id for crypto kind",
"rsd.crypto_kind",
rsd.crypto_kind
);
}; };
RouteNode::NodeId(node_id.value) RouteNode::NodeId(node_id.value)
} else { } else {
@ -1320,12 +1342,9 @@ impl RouteSpecStore {
}; };
// Encrypt the previous blob ENC(nonce, DH(PKhop,SKpr)) // Encrypt the previous blob ENC(nonce, DH(PKhop,SKpr))
let dh_secret = vcrypto let dh_secret = vcrypto.cached_dh(&rsd.hops[h], &rsd.secret_key)?;
.cached_dh(&rsd.hops[h], &rsd.secret_key) let enc_msg_data =
.wrap_err("dh failed")?; vcrypto.encrypt_aead(blob_data.as_slice(), &nonce, &dh_secret, None)?;
let enc_msg_data = vcrypto
.encrypt_aead(blob_data.as_slice(), &nonce, &dh_secret, None)
.wrap_err("encryption failed")?;
let route_hop_data = RouteHopData { let route_hop_data = RouteHopData {
nonce, nonce,
blob: enc_msg_data, blob: enc_msg_data,
@ -1346,7 +1365,7 @@ impl RouteSpecStore {
}) })
.flatten(); .flatten();
if pi.is_none() { if pi.is_none() {
bail!("peer info should exist for route but doesn't",); apibail_internal!("peer info should exist for route but doesn't");
} }
RouteNode::PeerInfo(Box::new(pi.unwrap())) RouteNode::PeerInfo(Box::new(pi.unwrap()))
}, },
@ -1373,13 +1392,13 @@ impl RouteSpecStore {
&self, &self,
key: &PublicKey, key: &PublicKey,
optimized: Option<bool>, optimized: Option<bool>,
) -> EyreResult<PrivateRoute> { ) -> VeilidAPIResult<PrivateRoute> {
let inner = &*self.inner.lock(); let inner = &*self.inner.lock();
let Some(rsid) = inner.content.get_id_by_key(key) else { let Some(rsid) = inner.content.get_id_by_key(key) else {
bail!("route key does not exist"); apibail_invalid_argument!("route key does not exist", "key", key);
}; };
let Some(rssd) = inner.content.get_detail(&rsid) else { let Some(rssd) = inner.content.get_detail(&rsid) else {
bail!("route id does not exist"); apibail_internal!("route id does not exist");
}; };
// See if we can optimize this compilation yet // See if we can optimize this compilation yet
@ -1406,10 +1425,10 @@ impl RouteSpecStore {
&self, &self,
id: &RouteId, id: &RouteId,
optimized: Option<bool>, optimized: Option<bool>,
) -> EyreResult<Vec<PrivateRoute>> { ) -> VeilidAPIResult<Vec<PrivateRoute>> {
let inner = &*self.inner.lock(); let inner = &*self.inner.lock();
let Some(rssd) = inner.content.get_detail(id) else { let Some(rssd) = inner.content.get_detail(id) else {
bail!("route id does not exist"); apibail_invalid_argument!("route id does not exist", "id", id);
}; };
// See if we can optimize this compilation yet // See if we can optimize this compilation yet
@ -1433,7 +1452,7 @@ impl RouteSpecStore {
feature = "verbose-tracing", feature = "verbose-tracing",
instrument(level = "trace", skip(self, blob), ret, err) instrument(level = "trace", skip(self, blob), ret, err)
)] )]
pub fn import_remote_private_route(&self, blob: Vec<u8>) -> EyreResult<RouteId> { pub fn import_remote_private_route(&self, blob: Vec<u8>) -> VeilidAPIResult<RouteId> {
let cur_ts = get_aligned_timestamp(); let cur_ts = get_aligned_timestamp();
// decode the pr blob // decode the pr blob
@ -1450,7 +1469,7 @@ impl RouteSpecStore {
for private_route in &private_routes { for private_route in &private_routes {
// ensure private route has first hop // ensure private route has first hop
if !matches!(private_route.hops, PrivateRouteHops::FirstHop(_)) { if !matches!(private_route.hops, PrivateRouteHops::FirstHop(_)) {
bail!("private route must have first hop"); apibail_generic!("private route must have first hop");
} }
// ensure this isn't also an allocated route // ensure this isn't also an allocated route
@ -1528,7 +1547,7 @@ impl RouteSpecStore {
&self, &self,
key: &PublicKey, key: &PublicKey,
cur_ts: Timestamp, cur_ts: Timestamp,
) -> EyreResult<()> { ) -> VeilidAPIResult<()> {
let our_node_info_ts = self let our_node_info_ts = self
.unlocked_inner .unlocked_inner
.routing_table .routing_table
@ -1550,7 +1569,7 @@ impl RouteSpecStore {
} }
} }
bail!("private route is missing from store: {}", key); apibail_invalid_argument!("private route is missing from store", "key", key);
} }
/// Get the route statistics for any route we know about, local or remote /// Get the route statistics for any route we know about, local or remote
@ -1589,6 +1608,8 @@ impl RouteSpecStore {
/// Clear caches when local our local node info changes /// Clear caches when local our local node info changes
#[instrument(level = "trace", skip(self))] #[instrument(level = "trace", skip(self))]
pub fn reset(&self) { pub fn reset(&self) {
log_rtab!(debug "flushing route spec store");
let inner = &mut *self.inner.lock(); let inner = &mut *self.inner.lock();
// Clean up local allocated routes // Clean up local allocated routes
@ -1601,10 +1622,10 @@ impl RouteSpecStore {
/// Mark route as published /// Mark route as published
/// When first deserialized, routes must be re-published in order to ensure they remain /// When first deserialized, routes must be re-published in order to ensure they remain
/// in the RouteSpecStore. /// in the RouteSpecStore.
pub fn mark_route_published(&self, id: &RouteId, published: bool) -> EyreResult<()> { pub fn mark_route_published(&self, id: &RouteId, published: bool) -> VeilidAPIResult<()> {
let inner = &mut *self.inner.lock(); let inner = &mut *self.inner.lock();
let Some(rssd) = inner.content.get_detail_mut(id) else { let Some(rssd) = inner.content.get_detail_mut(id) else {
bail!("route does not exist"); apibail_invalid_argument!("route does not exist", "id", id);
}; };
rssd.set_published(published); rssd.set_published(published);
Ok(()) Ok(())
@ -1622,13 +1643,13 @@ impl RouteSpecStore {
} }
/// Convert private route list to binary blob /// Convert private route list to binary blob
pub fn private_routes_to_blob(private_routes: &[PrivateRoute]) -> EyreResult<Vec<u8>> { pub fn private_routes_to_blob(private_routes: &[PrivateRoute]) -> VeilidAPIResult<Vec<u8>> {
let mut buffer = vec![]; let mut buffer = vec![];
// Serialize count // Serialize count
let pr_count = private_routes.len(); let pr_count = private_routes.len();
if pr_count > MAX_CRYPTO_KINDS { if pr_count > MAX_CRYPTO_KINDS {
bail!("too many crypto kinds to encode blob"); apibail_internal!("too many crypto kinds to encode blob");
} }
let pr_count = pr_count as u8; let pr_count = pr_count as u8;
buffer.push(pr_count); buffer.push(pr_count);
@ -1639,25 +1660,31 @@ impl RouteSpecStore {
let mut pr_builder = pr_message.init_root::<veilid_capnp::private_route::Builder>(); let mut pr_builder = pr_message.init_root::<veilid_capnp::private_route::Builder>();
encode_private_route(private_route, &mut pr_builder) encode_private_route(private_route, &mut pr_builder)
.wrap_err("failed to encode private route")?; .map_err(VeilidAPIError::internal)?;
capnp::serialize_packed::write_message(&mut buffer, &pr_message) capnp::serialize_packed::write_message(&mut buffer, &pr_message)
.map_err(RPCError::internal) .map_err(RPCError::internal)?;
.wrap_err("failed to convert builder to vec")?;
} }
Ok(buffer) Ok(buffer)
} }
/// Convert binary blob to private route /// Convert binary blob to private route vector
pub fn blob_to_private_routes(crypto: Crypto, blob: Vec<u8>) -> EyreResult<Vec<PrivateRoute>> { pub fn blob_to_private_routes(
crypto: Crypto,
blob: Vec<u8>,
) -> VeilidAPIResult<Vec<PrivateRoute>> {
// Deserialize count // Deserialize count
if blob.is_empty() { if blob.is_empty() {
bail!("not deserializing empty private route blob"); apibail_invalid_argument!(
"not deserializing empty private route blob",
"blob.is_empty",
true
);
} }
let pr_count = blob[0] as usize; let pr_count = blob[0] as usize;
if pr_count > MAX_CRYPTO_KINDS { if pr_count > MAX_CRYPTO_KINDS {
bail!("too many crypto kinds to decode blob"); apibail_invalid_argument!("too many crypto kinds to decode blob", "blob[0]", pr_count);
} }
// Deserialize stream of private routes // Deserialize stream of private routes
@ -1668,18 +1695,17 @@ impl RouteSpecStore {
&mut pr_slice, &mut pr_slice,
capnp::message::ReaderOptions::new(), capnp::message::ReaderOptions::new(),
) )
.map_err(RPCError::internal) .map_err(|e| VeilidAPIError::invalid_argument("failed to read blob", "e", e))?;
.wrap_err("failed to make message reader")?;
let pr_reader = reader let pr_reader = reader
.get_root::<veilid_capnp::private_route::Reader>() .get_root::<veilid_capnp::private_route::Reader>()
.map_err(RPCError::internal) .map_err(VeilidAPIError::internal)?;
.wrap_err("failed to make reader for private_route")?; let private_route = decode_private_route(&pr_reader).map_err(|e| {
let private_route = VeilidAPIError::invalid_argument("failed to decode private route", "e", e)
decode_private_route(&pr_reader).wrap_err("failed to decode private route")?; })?;
private_route private_route.validate(crypto.clone()).map_err(|e| {
.validate(crypto.clone()) VeilidAPIError::invalid_argument("failed to validate private route", "e", e)
.wrap_err("failed to validate private route")?; })?;
out.push(private_route); out.push(private_route);
} }
@ -1691,7 +1717,7 @@ impl RouteSpecStore {
} }
/// Generate RouteId from typed key set of route public keys /// Generate RouteId from typed key set of route public keys
fn generate_allocated_route_id(&self, rssd: &RouteSetSpecDetail) -> EyreResult<RouteId> { fn generate_allocated_route_id(&self, rssd: &RouteSetSpecDetail) -> VeilidAPIResult<RouteId> {
let route_set_keys = rssd.get_route_set_keys(); let route_set_keys = rssd.get_route_set_keys();
let crypto = self.unlocked_inner.routing_table.crypto(); let crypto = self.unlocked_inner.routing_table.crypto();
@ -1706,7 +1732,7 @@ impl RouteSpecStore {
idbytes.extend_from_slice(&tk.value.bytes); idbytes.extend_from_slice(&tk.value.bytes);
} }
let Some(best_kind) = best_kind else { let Some(best_kind) = best_kind else {
bail!("no compatible crypto kinds in route"); apibail_internal!("no compatible crypto kinds in route");
}; };
let vcrypto = crypto.get(best_kind).unwrap(); let vcrypto = crypto.get(best_kind).unwrap();
@ -1714,7 +1740,10 @@ impl RouteSpecStore {
} }
/// Generate RouteId from set of private routes /// Generate RouteId from set of private routes
fn generate_remote_route_id(&self, private_routes: &[PrivateRoute]) -> EyreResult<RouteId> { fn generate_remote_route_id(
&self,
private_routes: &[PrivateRoute],
) -> VeilidAPIResult<RouteId> {
let crypto = self.unlocked_inner.routing_table.crypto(); let crypto = self.unlocked_inner.routing_table.crypto();
let mut idbytes = Vec::with_capacity(PUBLIC_KEY_LENGTH * private_routes.len()); let mut idbytes = Vec::with_capacity(PUBLIC_KEY_LENGTH * private_routes.len());
@ -1729,7 +1758,7 @@ impl RouteSpecStore {
idbytes.extend_from_slice(&private_route.public_key.value.bytes); idbytes.extend_from_slice(&private_route.public_key.value.bytes);
} }
let Some(best_kind) = best_kind else { let Some(best_kind) = best_kind else {
bail!("no compatible crypto kinds in route"); apibail_internal!("no compatible crypto kinds in route");
}; };
let vcrypto = crypto.get(best_kind).unwrap(); let vcrypto = crypto.get(best_kind).unwrap();

View File

@ -140,7 +140,7 @@ impl RoutingDomainEditor {
log_rtab!(debug "[{:?}] COMMIT: {:?}", self.routing_domain, self.changes); log_rtab!(debug "[{:?}] COMMIT: {:?}", self.routing_domain, self.changes);
// Apply changes // Apply changes
let mut changed = false; let mut peer_info_changed = false;
{ {
let mut inner = self.routing_table.inner.write(); let mut inner = self.routing_table.inner.write();
inner.with_routing_domain_mut(self.routing_domain, |detail| { inner.with_routing_domain_mut(self.routing_domain, |detail| {
@ -167,22 +167,21 @@ impl RoutingDomainEditor {
detail detail
.common_mut() .common_mut()
.clear_dial_info_details(address_type, protocol_type); .clear_dial_info_details(address_type, protocol_type);
changed = true; peer_info_changed = true;
} }
RoutingDomainChange::ClearRelayNode => { RoutingDomainChange::ClearRelayNode => {
info!("[{:?}] cleared relay node", self.routing_domain); info!("[{:?}] cleared relay node", self.routing_domain);
detail.common_mut().set_relay_node(None); detail.common_mut().set_relay_node(None);
changed = true; peer_info_changed = true;
} }
RoutingDomainChange::SetRelayNode { relay_node } => { RoutingDomainChange::SetRelayNode { relay_node } => {
info!("[{:?}] set relay node: {}", self.routing_domain, relay_node); info!("[{:?}] set relay node: {}", self.routing_domain, relay_node);
detail.common_mut().set_relay_node(Some(relay_node.clone())); detail.common_mut().set_relay_node(Some(relay_node.clone()));
changed = true; peer_info_changed = true;
} }
RoutingDomainChange::SetRelayNodeKeepalive { ts } => { RoutingDomainChange::SetRelayNodeKeepalive { ts } => {
debug!("[{:?}] relay node keepalive: {:?}", self.routing_domain, ts); debug!("[{:?}] relay node keepalive: {:?}", self.routing_domain, ts);
detail.common_mut().set_relay_node_last_keepalive(ts); detail.common_mut().set_relay_node_last_keepalive(ts);
changed = true;
} }
RoutingDomainChange::AddDialInfoDetail { dial_info_detail } => { RoutingDomainChange::AddDialInfoDetail { dial_info_detail } => {
info!( info!(
@ -195,7 +194,7 @@ impl RoutingDomainEditor {
.common_mut() .common_mut()
.add_dial_info_detail(dial_info_detail.clone()); .add_dial_info_detail(dial_info_detail.clone());
changed = true; peer_info_changed = true;
} }
RoutingDomainChange::SetupNetwork { RoutingDomainChange::SetupNetwork {
outbound_protocols, outbound_protocols,
@ -229,7 +228,7 @@ impl RoutingDomainEditor {
address_types, address_types,
capabilities.clone(), capabilities.clone(),
); );
changed = true; peer_info_changed = true;
} }
} }
RoutingDomainChange::SetNetworkClass { network_class } => { RoutingDomainChange::SetNetworkClass { network_class } => {
@ -246,19 +245,19 @@ impl RoutingDomainEditor {
info!("[{:?}] cleared network class", self.routing_domain,); info!("[{:?}] cleared network class", self.routing_domain,);
} }
detail.common_mut().set_network_class(network_class); detail.common_mut().set_network_class(network_class);
changed = true; peer_info_changed = true;
} }
} }
} }
} }
}); });
if changed { if peer_info_changed {
// Allow signed node info updates at same timestamp for otherwise dead nodes if our network has changed // Allow signed node info updates at same timestamp for otherwise dead nodes if our network has changed
inner.reset_all_updated_since_last_network_change(); inner.reset_all_updated_since_last_network_change();
} }
} }
// Clear the routespecstore cache if our PublicInternet dial info has changed // Clear the routespecstore cache if our PublicInternet dial info has changed
if changed && self.routing_domain == RoutingDomain::PublicInternet { if peer_info_changed && self.routing_domain == RoutingDomain::PublicInternet {
let rss = self.routing_table.route_spec_store(); let rss = self.routing_table.route_spec_store();
rss.reset(); rss.reset();
} }

View File

@ -132,6 +132,13 @@ impl RoutingTable {
async move { async move {
let success = match rss.test_route(r).await { let success = match rss.test_route(r).await {
Ok(v) => v, Ok(v) => v,
// Route was already removed
Err(VeilidAPIError::InvalidArgument {
context: _,
argument: _,
value: _,
}) => false,
// Other failures
Err(e) => { Err(e) => {
log_rtab!(error "Test route failed: {}", e); log_rtab!(error "Test route failed: {}", e);
ctx.lock().failed = true; ctx.lock().failed = true;
@ -205,15 +212,21 @@ impl RoutingTable {
for _n in 0..routes_to_allocate { for _n in 0..routes_to_allocate {
// Parameters here must be the most inclusive safety route spec // Parameters here must be the most inclusive safety route spec
// These will be used by test_remote_route as well // These will be used by test_remote_route as well
if let Some(k) = rss.allocate_route( match rss.allocate_route(
&VALID_CRYPTO_KINDS, &VALID_CRYPTO_KINDS,
Stability::default(), Stability::default(),
Sequencing::EnsureOrdered, Sequencing::EnsureOrdered,
default_route_hop_count, default_route_hop_count,
DirectionSet::all(), DirectionSet::all(),
&[], &[],
)? { ) {
newly_allocated_routes.push(k); Err(VeilidAPIError::TryAgain { message }) => {
log_rtab!(debug "Route allocation unavailable: {}", message);
}
Err(e) => return Err(e.into()),
Ok(v) => {
newly_allocated_routes.push(v);
}
} }
} }

View File

@ -164,7 +164,7 @@ impl RPCProcessor {
pub(super) fn get_destination_respond_to( pub(super) fn get_destination_respond_to(
&self, &self,
dest: &Destination, dest: &Destination,
) -> Result<NetworkResult<RespondTo>, RPCError> { ) -> RPCNetworkResult<RespondTo> {
let routing_table = self.routing_table(); let routing_table = self.routing_table();
let rss = routing_table.route_spec_store(); let rss = routing_table.route_spec_store();
@ -180,23 +180,18 @@ impl RPCProcessor {
SafetySelection::Safe(safety_spec) => { SafetySelection::Safe(safety_spec) => {
// Sent directly but with a safety route, respond to private route // Sent directly but with a safety route, respond to private route
let crypto_kind = target.best_node_id().kind; let crypto_kind = target.best_node_id().kind;
let Some(pr_key) = rss let pr_key = network_result_try!(rss
.get_private_route_for_safety_spec( .get_private_route_for_safety_spec(
crypto_kind, crypto_kind,
safety_spec, safety_spec,
&target.node_ids(), &target.node_ids(),
) )
.map_err(RPCError::internal)? .to_rpc_network_result()?);
else {
return Ok(NetworkResult::no_connection_other(
"no private route for response at this time",
));
};
// Get the assembled route for response // Get the assembled route for response
let private_route = rss let private_route = network_result_try!(rss
.assemble_private_route(&pr_key, None) .assemble_private_route(&pr_key, None)
.map_err(RPCError::internal)?; .to_rpc_network_result()?);
Ok(NetworkResult::Value(RespondTo::PrivateRoute(private_route))) Ok(NetworkResult::Value(RespondTo::PrivateRoute(private_route)))
} }
@ -216,19 +211,14 @@ impl RPCProcessor {
let mut avoid_nodes = relay.node_ids(); let mut avoid_nodes = relay.node_ids();
avoid_nodes.add_all(&target.node_ids()); avoid_nodes.add_all(&target.node_ids());
let Some(pr_key) = rss let pr_key = network_result_try!(rss
.get_private_route_for_safety_spec(crypto_kind, safety_spec, &avoid_nodes) .get_private_route_for_safety_spec(crypto_kind, safety_spec, &avoid_nodes,)
.map_err(RPCError::internal)? .to_rpc_network_result()?);
else {
return Ok(NetworkResult::no_connection_other(
"no private route for response at this time",
));
};
// Get the assembled route for response // Get the assembled route for response
let private_route = rss let private_route = network_result_try!(rss
.assemble_private_route(&pr_key, None) .assemble_private_route(&pr_key, None)
.map_err(RPCError::internal)?; .to_rpc_network_result()?);
Ok(NetworkResult::Value(RespondTo::PrivateRoute(private_route))) Ok(NetworkResult::Value(RespondTo::PrivateRoute(private_route)))
} }
@ -249,7 +239,7 @@ impl RPCProcessor {
SafetySelection::Unsafe(_) => { SafetySelection::Unsafe(_) => {
// Sent to a private route with no safety route, use a stub safety route for the response // Sent to a private route with no safety route, use a stub safety route for the response
if !routing_table.has_valid_network_class(RoutingDomain::PublicInternet) { if !routing_table.has_valid_network_class(RoutingDomain::PublicInternet) {
return Ok(NetworkResult::no_connection_other( return Ok(NetworkResult::service_unavailable(
"Own node info must be valid to use private route", "Own node info must be valid to use private route",
)); ));
} }
@ -282,25 +272,19 @@ impl RPCProcessor {
private_route.public_key.value private_route.public_key.value
} else { } else {
// Get the private route to respond to that matches the safety route spec we sent the request with // Get the private route to respond to that matches the safety route spec we sent the request with
let Some(pr_key) = rss network_result_try!(rss
.get_private_route_for_safety_spec( .get_private_route_for_safety_spec(
crypto_kind, crypto_kind,
safety_spec, safety_spec,
&[avoid_node_id], &[avoid_node_id],
) )
.map_err(RPCError::internal)? .to_rpc_network_result()?)
else {
return Ok(NetworkResult::no_connection_other(
"no private route for response at this time",
));
};
pr_key
}; };
// Get the assembled route for response // Get the assembled route for response
let private_route = rss let private_route = network_result_try!(rss
.assemble_private_route(&pr_key, None) .assemble_private_route(&pr_key, None)
.map_err(RPCError::internal)?; .to_rpc_network_result()?);
Ok(NetworkResult::Value(RespondTo::PrivateRoute(private_route))) Ok(NetworkResult::Value(RespondTo::PrivateRoute(private_route)))
} }

View File

@ -8,7 +8,7 @@ where
result: Option<Result<R, RPCError>>, result: Option<Result<R, RPCError>>,
} }
pub type FanoutCallReturnType = Result<Option<Vec<PeerInfo>>, RPCError>; pub type FanoutCallReturnType = RPCNetworkResult<Vec<PeerInfo>>;
pub type FanoutNodeInfoFilter = Arc<dyn Fn(&[TypedKey], &NodeInfo) -> bool + Send + Sync>; pub type FanoutNodeInfoFilter = Arc<dyn Fn(&[TypedKey], &NodeInfo) -> bool + Send + Sync>;
pub fn empty_fanout_node_info_filter() -> FanoutNodeInfoFilter { pub fn empty_fanout_node_info_filter() -> FanoutNodeInfoFilter {
@ -132,7 +132,7 @@ where
// Do the call for this node // Do the call for this node
match (self.call_routine)(next_node.clone()).await { match (self.call_routine)(next_node.clone()).await {
Ok(Some(v)) => { Ok(NetworkResult::Value(v)) => {
// Filter returned nodes // Filter returned nodes
let filtered_v: Vec<PeerInfo> = v let filtered_v: Vec<PeerInfo> = v
.into_iter() .into_iter()
@ -155,8 +155,11 @@ where
.register_find_node_answer(self.crypto_kind, filtered_v); .register_find_node_answer(self.crypto_kind, filtered_v);
self.clone().add_to_fanout_queue(&new_nodes); self.clone().add_to_fanout_queue(&new_nodes);
} }
Ok(None) => { #[allow(unused_variables)]
Ok(x) => {
// Call failed, node will not be considered again // Call failed, node will not be considered again
#[cfg(feature = "network-result-extra")]
log_rpc!(debug "Fanout result {}: {:?}", &next_node, x);
} }
Err(e) => { Err(e) => {
// Error happened, abort everything and return the error // Error happened, abort everything and return the error

View File

@ -469,24 +469,15 @@ impl RPCProcessor {
let call_routine = |next_node: NodeRef| { let call_routine = |next_node: NodeRef| {
let this = self.clone(); let this = self.clone();
async move { async move {
match this let v = network_result_try!(this
.clone() .clone()
.rpc_call_find_node( .rpc_call_find_node(
Destination::direct(next_node).with_safety(safety_selection), Destination::direct(next_node).with_safety(safety_selection),
node_id, node_id,
vec![], vec![],
) )
.await .await?);
{ Ok(NetworkResult::value(v.answer))
Ok(v) => {
let v = network_result_value_or_log!(v => [ format!(": node_id={} count={} fanout={} fanout={} safety_selection={:?}", node_id, count, fanout, timeout_us, safety_selection) ] {
// Any other failures, just try the next node
return Ok(None);
});
Ok(Some(v.answer))
}
Err(e) => Err(e),
}
} }
}; };
@ -636,7 +627,7 @@ impl RPCProcessor {
remote_private_route: PrivateRoute, remote_private_route: PrivateRoute,
reply_private_route: Option<PublicKey>, reply_private_route: Option<PublicKey>,
message_data: Vec<u8>, message_data: Vec<u8>,
) -> Result<NetworkResult<RenderedOperation>, RPCError> { ) -> RPCNetworkResult<RenderedOperation> {
let routing_table = self.routing_table(); let routing_table = self.routing_table();
let rss = routing_table.route_spec_store(); let rss = routing_table.route_spec_store();
@ -650,17 +641,8 @@ impl RPCProcessor {
}; };
// Compile the safety route with the private route // Compile the safety route with the private route
let compiled_route: CompiledRoute = match rss let compiled_route: CompiledRoute = network_result_try!(rss
.compile_safety_route(safety_selection, remote_private_route) .compile_safety_route(safety_selection, remote_private_route).to_rpc_network_result()?);
.map_err(RPCError::internal)?
{
Some(cr) => cr,
None => {
return Ok(NetworkResult::no_connection_other(
"private route could not be compiled at this time",
))
}
};
let sr_is_stub = compiled_route.safety_route.is_stub(); let sr_is_stub = compiled_route.safety_route.is_stub();
let sr_pubkey = compiled_route.safety_route.public_key.value; let sr_pubkey = compiled_route.safety_route.public_key.value;
@ -721,7 +703,7 @@ impl RPCProcessor {
&self, &self,
dest: Destination, dest: Destination,
operation: &RPCOperation, operation: &RPCOperation,
) -> Result<NetworkResult<RenderedOperation>, RPCError> { ) ->RPCNetworkResult<RenderedOperation> {
let out: NetworkResult<RenderedOperation>; let out: NetworkResult<RenderedOperation>;
// Encode message to a builder and make a message reader for it // Encode message to a builder and make a message reader for it
@ -1160,7 +1142,7 @@ impl RPCProcessor {
dest: Destination, dest: Destination,
question: RPCQuestion, question: RPCQuestion,
context: Option<QuestionContext>, context: Option<QuestionContext>,
) -> Result<NetworkResult<WaitableReply>, RPCError> { ) ->RPCNetworkResult<WaitableReply> {
// Get sender peer info if we should send that // Get sender peer info if we should send that
let spi = self.get_sender_peer_info(&dest); let spi = self.get_sender_peer_info(&dest);
@ -1256,7 +1238,7 @@ impl RPCProcessor {
&self, &self,
dest: Destination, dest: Destination,
statement: RPCStatement, statement: RPCStatement,
) -> Result<NetworkResult<()>, RPCError> { ) ->RPCNetworkResult<()> {
// Get sender peer info if we should send that // Get sender peer info if we should send that
let spi = self.get_sender_peer_info(&dest); let spi = self.get_sender_peer_info(&dest);
@ -1331,7 +1313,7 @@ impl RPCProcessor {
&self, &self,
request: RPCMessage, request: RPCMessage,
answer: RPCAnswer, answer: RPCAnswer,
) -> Result<NetworkResult<()>, RPCError> { ) ->RPCNetworkResult<()> {
// Extract destination from respond_to // Extract destination from respond_to
let dest = network_result_try!(self.get_respond_to_destination(&request)); let dest = network_result_try!(self.get_respond_to_destination(&request));
@ -1457,7 +1439,7 @@ impl RPCProcessor {
async fn process_rpc_message( async fn process_rpc_message(
&self, &self,
encoded_msg: RPCMessageEncoded, encoded_msg: RPCMessageEncoded,
) -> Result<NetworkResult<()>, RPCError> { ) ->RPCNetworkResult<()> {
let address_filter = self.network_manager.address_filter(); let address_filter = self.network_manager.address_filter();
// Decode operation appropriately based on header detail // Decode operation appropriately based on header detail

View File

@ -11,7 +11,7 @@ impl RPCProcessor {
self, self,
dest: Destination, dest: Destination,
message: Vec<u8>, message: Vec<u8>,
) -> Result<NetworkResult<Answer<Vec<u8>>>, RPCError> { ) -> RPCNetworkResult<Answer<Vec<u8>>> {
let debug_string = format!("AppCall(message(len)={}) => {}", message.len(), dest); let debug_string = format!("AppCall(message(len)={}) => {}", message.len(), dest);
let app_call_q = RPCOperationAppCallQ::new(message)?; let app_call_q = RPCOperationAppCallQ::new(message)?;
@ -49,10 +49,7 @@ impl RPCProcessor {
} }
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))] #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))]
pub(crate) async fn process_app_call_q( pub(crate) async fn process_app_call_q(&self, msg: RPCMessage) -> RPCNetworkResult<()> {
&self,
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
// Ignore if disabled // Ignore if disabled
let routing_table = self.routing_table(); let routing_table = self.routing_table();

View File

@ -11,7 +11,7 @@ impl RPCProcessor {
self, self,
dest: Destination, dest: Destination,
message: Vec<u8>, message: Vec<u8>,
) -> Result<NetworkResult<()>, RPCError> { ) -> RPCNetworkResult<()> {
let app_message = RPCOperationAppMessage::new(message)?; let app_message = RPCOperationAppMessage::new(message)?;
let statement = RPCStatement::new(RPCStatementDetail::AppMessage(Box::new(app_message))); let statement = RPCStatement::new(RPCStatementDetail::AppMessage(Box::new(app_message)));
@ -20,10 +20,7 @@ impl RPCProcessor {
} }
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))] #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))]
pub(crate) async fn process_app_message( pub(crate) async fn process_app_message(&self, msg: RPCMessage) -> RPCNetworkResult<()> {
&self,
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
// Ignore if disabled // Ignore if disabled
let routing_table = self.routing_table(); let routing_table = self.routing_table();
let opi = routing_table.get_own_peer_info(msg.header.routing_domain()); let opi = routing_table.get_own_peer_info(msg.header.routing_domain());

View File

@ -2,10 +2,7 @@ use super::*;
impl RPCProcessor { impl RPCProcessor {
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))] #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))]
pub(crate) async fn process_cancel_tunnel_q( pub(crate) async fn process_cancel_tunnel_q(&self, msg: RPCMessage) -> RPCNetworkResult<()> {
&self,
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
// Ignore if disabled // Ignore if disabled
#[cfg(feature = "unstable-tunnels")] #[cfg(feature = "unstable-tunnels")]
{ {

View File

@ -2,10 +2,7 @@ use super::*;
impl RPCProcessor { impl RPCProcessor {
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))] #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))]
pub(crate) async fn process_complete_tunnel_q( pub(crate) async fn process_complete_tunnel_q(&self, msg: RPCMessage) -> RPCNetworkResult<()> {
&self,
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
// Ignore if disabled // Ignore if disabled
#[cfg(feature = "unstable-tunnels")] #[cfg(feature = "unstable-tunnels")]
{ {

View File

@ -13,6 +13,8 @@ pub enum RPCError {
Internal(String), Internal(String),
#[error("[RPCError: Network({0})]")] #[error("[RPCError: Network({0})]")]
Network(String), Network(String),
#[error("[RPCError: TryAgain({0})]")]
TryAgain(String),
} }
impl RPCError { impl RPCError {
@ -56,6 +58,25 @@ impl From<RPCError> for VeilidAPIError {
RPCError::Protocol(message) => VeilidAPIError::Generic { message }, RPCError::Protocol(message) => VeilidAPIError::Generic { message },
RPCError::Internal(message) => VeilidAPIError::Internal { message }, RPCError::Internal(message) => VeilidAPIError::Internal { message },
RPCError::Network(message) => VeilidAPIError::Generic { message }, RPCError::Network(message) => VeilidAPIError::Generic { message },
RPCError::TryAgain(message) => VeilidAPIError::TryAgain { message },
}
}
}
pub(crate) type RPCNetworkResult<T> = Result<NetworkResult<T>, RPCError>;
pub(crate) trait ToRPCNetworkResult<T> {
fn to_rpc_network_result(self) -> RPCNetworkResult<T>;
}
impl<T> ToRPCNetworkResult<T> for VeilidAPIResult<T> {
fn to_rpc_network_result(self) -> RPCNetworkResult<T> {
match self {
Err(VeilidAPIError::TryAgain { message }) => Err(RPCError::TryAgain(message)),
Err(VeilidAPIError::Timeout) => Ok(NetworkResult::timeout()),
Err(VeilidAPIError::Unimplemented { message }) => Err(RPCError::Unimplemented(message)),
Err(e) => Err(RPCError::internal(e)),
Ok(v) => Ok(NetworkResult::value(v)),
} }
} }
} }

View File

@ -2,10 +2,7 @@ use super::*;
impl RPCProcessor { impl RPCProcessor {
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))] #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))]
pub(crate) async fn process_find_block_q( pub(crate) async fn process_find_block_q(&self, msg: RPCMessage) -> RPCNetworkResult<()> {
&self,
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
// Ignore if disabled // Ignore if disabled
#[cfg(feature = "unstable-blockstore")] #[cfg(feature = "unstable-blockstore")]
{ {

View File

@ -16,7 +16,7 @@ impl RPCProcessor {
dest: Destination, dest: Destination,
node_id: TypedKey, node_id: TypedKey,
capabilities: Vec<Capability>, capabilities: Vec<Capability>,
) -> Result<NetworkResult<Answer<Vec<PeerInfo>>>, RPCError> { ) -> RPCNetworkResult<Answer<Vec<PeerInfo>>> {
// Ensure destination never has a private route // Ensure destination never has a private route
if matches!( if matches!(
dest, dest,
@ -78,10 +78,7 @@ impl RPCProcessor {
} }
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))] #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))]
pub(crate) async fn process_find_node_q( pub(crate) async fn process_find_node_q(&self, msg: RPCMessage) -> RPCNetworkResult<()> {
&self,
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
// Ensure this never came over a private route, safety route is okay though // Ensure this never came over a private route, safety route is okay though
match &msg.header.detail { match &msg.header.detail {
RPCMessageHeaderDetail::Direct(_) | RPCMessageHeaderDetail::SafetyRouted(_) => {} RPCMessageHeaderDetail::Direct(_) | RPCMessageHeaderDetail::SafetyRouted(_) => {}

View File

@ -32,7 +32,7 @@ impl RPCProcessor {
key: TypedKey, key: TypedKey,
subkey: ValueSubkey, subkey: ValueSubkey,
last_descriptor: Option<SignedValueDescriptor>, last_descriptor: Option<SignedValueDescriptor>,
) -> Result<NetworkResult<Answer<GetValueAnswer>>, RPCError> { ) ->RPCNetworkResult<Answer<GetValueAnswer>> {
// Ensure destination never has a private route // Ensure destination never has a private route
// and get the target noderef so we can validate the response // and get the target noderef so we can validate the response
let Some(target) = dest.target() else { let Some(target) = dest.target() else {
@ -168,7 +168,7 @@ impl RPCProcessor {
pub(crate) async fn process_get_value_q( pub(crate) async fn process_get_value_q(
&self, &self,
msg: RPCMessage, msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> { ) ->RPCNetworkResult<()> {
// Ensure this never came over a private route, safety route is okay though // Ensure this never came over a private route, safety route is okay though
match &msg.header.detail { match &msg.header.detail {

View File

@ -11,7 +11,7 @@ impl RPCProcessor {
self, self,
dest: Destination, dest: Destination,
receipt: D, receipt: D,
) -> Result<NetworkResult<()>, RPCError> { ) -> RPCNetworkResult<()> {
let receipt = receipt.as_ref().to_vec(); let receipt = receipt.as_ref().to_vec();
let return_receipt = RPCOperationReturnReceipt::new(receipt)?; let return_receipt = RPCOperationReturnReceipt::new(receipt)?;
@ -25,10 +25,7 @@ impl RPCProcessor {
} }
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))] #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))]
pub(crate) async fn process_return_receipt( pub(crate) async fn process_return_receipt(&self, msg: RPCMessage) -> RPCNetworkResult<()> {
&self,
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
// Get the statement // Get the statement
let (_, _, _, kind) = msg.operation.destructure(); let (_, _, _, kind) = msg.operation.destructure();
let receipt = match kind { let receipt = match kind {

View File

@ -10,7 +10,7 @@ impl RPCProcessor {
routed_operation: RoutedOperation, routed_operation: RoutedOperation,
route_hop: RouteHop, route_hop: RouteHop,
safety_route: SafetyRoute, safety_route: SafetyRoute,
) -> Result<NetworkResult<()>, RPCError> { ) -> RPCNetworkResult<()> {
// Make sure hop count makes sense // Make sure hop count makes sense
if safety_route.hop_count as usize > self.unlocked_inner.max_route_hop_count { if safety_route.hop_count as usize > self.unlocked_inner.max_route_hop_count {
return Ok(NetworkResult::invalid_message( return Ok(NetworkResult::invalid_message(
@ -69,7 +69,7 @@ impl RPCProcessor {
next_route_node: RouteNode, next_route_node: RouteNode,
safety_route_public_key: TypedKey, safety_route_public_key: TypedKey,
next_private_route: PrivateRoute, next_private_route: PrivateRoute,
) -> Result<NetworkResult<()>, RPCError> { ) -> RPCNetworkResult<()> {
// Make sure hop count makes sense // Make sure hop count makes sense
if next_private_route.hop_count as usize > self.unlocked_inner.max_route_hop_count { if next_private_route.hop_count as usize > self.unlocked_inner.max_route_hop_count {
return Ok(NetworkResult::invalid_message( return Ok(NetworkResult::invalid_message(
@ -122,7 +122,7 @@ impl RPCProcessor {
vcrypto: CryptoSystemVersion, vcrypto: CryptoSystemVersion,
routed_operation: RoutedOperation, routed_operation: RoutedOperation,
remote_sr_pubkey: TypedKey, remote_sr_pubkey: TypedKey,
) -> Result<NetworkResult<()>, RPCError> { ) -> RPCNetworkResult<()> {
// Now that things are valid, decrypt the routed operation with DEC(nonce, DH(the SR's public key, the PR's (or node's) secret) // Now that things are valid, decrypt the routed operation with DEC(nonce, DH(the SR's public key, the PR's (or node's) secret)
// xxx: punish nodes that send messages that fail to decrypt eventually? How to do this for safety routes? // xxx: punish nodes that send messages that fail to decrypt eventually? How to do this for safety routes?
let node_id_secret = self.routing_table.node_id_secret_key(remote_sr_pubkey.kind); let node_id_secret = self.routing_table.node_id_secret_key(remote_sr_pubkey.kind);
@ -170,7 +170,7 @@ impl RPCProcessor {
routed_operation: RoutedOperation, routed_operation: RoutedOperation,
remote_sr_pubkey: TypedKey, remote_sr_pubkey: TypedKey,
pr_pubkey: TypedKey, pr_pubkey: TypedKey,
) -> Result<NetworkResult<()>, RPCError> { ) -> RPCNetworkResult<()> {
// Get sender id of the peer with the crypto kind of the route // Get sender id of the peer with the crypto kind of the route
let Some(sender_id) = detail.peer_noderef.node_ids().get(pr_pubkey.kind) else { let Some(sender_id) = detail.peer_noderef.node_ids().get(pr_pubkey.kind) else {
return Ok(NetworkResult::invalid_message( return Ok(NetworkResult::invalid_message(
@ -246,7 +246,7 @@ impl RPCProcessor {
routed_operation: RoutedOperation, routed_operation: RoutedOperation,
remote_sr_pubkey: TypedKey, remote_sr_pubkey: TypedKey,
pr_pubkey: TypedKey, pr_pubkey: TypedKey,
) -> Result<NetworkResult<()>, RPCError> { ) -> RPCNetworkResult<()> {
// If the private route public key is our node id, then this was sent via safety route to our node directly // If the private route public key is our node id, then this was sent via safety route to our node directly
// so there will be no signatures to validate // so there will be no signatures to validate
if self.routing_table.node_ids().contains(&pr_pubkey) { if self.routing_table.node_ids().contains(&pr_pubkey) {
@ -277,7 +277,7 @@ impl RPCProcessor {
mut routed_operation: RoutedOperation, mut routed_operation: RoutedOperation,
sr_pubkey: TypedKey, sr_pubkey: TypedKey,
mut private_route: PrivateRoute, mut private_route: PrivateRoute,
) -> Result<NetworkResult<()>, RPCError> { ) -> RPCNetworkResult<()> {
let Some(pr_first_hop) = private_route.pop_first_hop() else { let Some(pr_first_hop) = private_route.pop_first_hop() else {
return Ok(NetworkResult::invalid_message( return Ok(NetworkResult::invalid_message(
"switching from safety route to private route requires first hop", "switching from safety route to private route requires first hop",
@ -341,7 +341,7 @@ impl RPCProcessor {
route_hop_data: &RouteHopData, route_hop_data: &RouteHopData,
pr_pubkey: &TypedKey, pr_pubkey: &TypedKey,
route_operation: &mut RoutedOperation, route_operation: &mut RoutedOperation,
) -> Result<NetworkResult<RouteHop>, RPCError> { ) -> RPCNetworkResult<RouteHop> {
// Get crypto kind // Get crypto kind
let crypto_kind = pr_pubkey.kind; let crypto_kind = pr_pubkey.kind;
let Some(vcrypto) = self.crypto.get(crypto_kind) else { let Some(vcrypto) = self.crypto.get(crypto_kind) else {
@ -402,10 +402,7 @@ impl RPCProcessor {
feature = "verbose-tracing", feature = "verbose-tracing",
instrument(level = "trace", skip(self), ret, err) instrument(level = "trace", skip(self), ret, err)
)] )]
pub(crate) async fn process_route( pub(crate) async fn process_route(&self, msg: RPCMessage) -> RPCNetworkResult<()> {
&self,
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
// Ignore if disabled // Ignore if disabled
let routing_table = self.routing_table(); let routing_table = self.routing_table();
if !routing_table.has_valid_network_class(msg.header.routing_domain()) { if !routing_table.has_valid_network_class(msg.header.routing_domain()) {

View File

@ -36,7 +36,7 @@ impl RPCProcessor {
value: SignedValueData, value: SignedValueData,
descriptor: SignedValueDescriptor, descriptor: SignedValueDescriptor,
send_descriptor: bool, send_descriptor: bool,
) -> Result<NetworkResult<Answer<SetValueAnswer>>, RPCError> { ) ->RPCNetworkResult<Answer<SetValueAnswer>> {
// Ensure destination never has a private route // Ensure destination never has a private route
// and get the target noderef so we can validate the response // and get the target noderef so we can validate the response
let Some(target) = dest.target() else { let Some(target) = dest.target() else {
@ -182,7 +182,7 @@ impl RPCProcessor {
pub(crate) async fn process_set_value_q( pub(crate) async fn process_set_value_q(
&self, &self,
msg: RPCMessage, msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> { ) ->RPCNetworkResult<()> {
// Ignore if disabled // Ignore if disabled
let routing_table = self.routing_table(); let routing_table = self.routing_table();
let opi = routing_table.get_own_peer_info(msg.header.routing_domain()); let opi = routing_table.get_own_peer_info(msg.header.routing_domain());

View File

@ -11,7 +11,7 @@ impl RPCProcessor {
self, self,
dest: Destination, dest: Destination,
signal_info: SignalInfo, signal_info: SignalInfo,
) -> Result<NetworkResult<()>, RPCError> { ) -> RPCNetworkResult<()> {
// Ensure destination never has a private route // Ensure destination never has a private route
if matches!( if matches!(
dest, dest,
@ -33,10 +33,7 @@ impl RPCProcessor {
} }
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))] #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))]
pub(crate) async fn process_signal( pub(crate) async fn process_signal(&self, msg: RPCMessage) -> RPCNetworkResult<()> {
&self,
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
// Ignore if disabled // Ignore if disabled
let routing_table = self.routing_table(); let routing_table = self.routing_table();
let opi = routing_table.get_own_peer_info(msg.header.routing_domain()); let opi = routing_table.get_own_peer_info(msg.header.routing_domain());

View File

@ -2,10 +2,7 @@ use super::*;
impl RPCProcessor { impl RPCProcessor {
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))] #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))]
pub(crate) async fn process_start_tunnel_q( pub(crate) async fn process_start_tunnel_q(&self, msg: RPCMessage) -> RPCNetworkResult<()> {
&self,
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
// Ignore if disabled // Ignore if disabled
#[cfg(feature = "unstable-tunnels")] #[cfg(feature = "unstable-tunnels")]
{ {

View File

@ -22,7 +22,7 @@ impl RPCProcessor {
pub async fn rpc_call_status( pub async fn rpc_call_status(
self, self,
dest: Destination, dest: Destination,
) -> Result<NetworkResult<Answer<Option<SenderInfo>>>, RPCError> { ) -> RPCNetworkResult<Answer<Option<SenderInfo>>> {
let (opt_target_nr, routing_domain, node_status) = match dest.get_safety_selection() { let (opt_target_nr, routing_domain, node_status) = match dest.get_safety_selection() {
SafetySelection::Unsafe(_) => { SafetySelection::Unsafe(_) => {
let (opt_target_nr, routing_domain) = match &dest { let (opt_target_nr, routing_domain) = match &dest {
@ -197,10 +197,7 @@ impl RPCProcessor {
} }
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))] #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))]
pub(crate) async fn process_status_q( pub(crate) async fn process_status_q(&self, msg: RPCMessage) -> RPCNetworkResult<()> {
&self,
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
// Get the question // Get the question
let kind = msg.operation.kind().clone(); let kind = msg.operation.kind().clone();
let status_q = match kind { let status_q = match kind {

View File

@ -2,10 +2,7 @@ use super::*;
impl RPCProcessor { impl RPCProcessor {
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))] #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))]
pub(crate) async fn process_supply_block_q( pub(crate) async fn process_supply_block_q(&self, msg: RPCMessage) -> RPCNetworkResult<()> {
&self,
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
// Ignore if disabled // Ignore if disabled
#[cfg(feature = "unstable-blockstore")] #[cfg(feature = "unstable-blockstore")]
{ {

View File

@ -56,10 +56,7 @@ impl RPCProcessor {
} }
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))] #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))]
pub(crate) async fn process_validate_dial_info( pub(crate) async fn process_validate_dial_info(&self, msg: RPCMessage) -> RPCNetworkResult<()> {
&self,
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
let routing_table = self.routing_table(); let routing_table = self.routing_table();
if !routing_table.has_valid_network_class(msg.header.routing_domain()) { if !routing_table.has_valid_network_class(msg.header.routing_domain()) {
return Ok(NetworkResult::service_unavailable( return Ok(NetworkResult::service_unavailable(

View File

@ -2,10 +2,7 @@ use super::*;
impl RPCProcessor { impl RPCProcessor {
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), err))] #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), err))]
pub(crate) async fn process_value_changed( pub(crate) async fn process_value_changed(&self, msg: RPCMessage) -> RPCNetworkResult<()> {
&self,
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
// Ignore if disabled // Ignore if disabled
let routing_table = self.routing_table(); let routing_table = self.routing_table();
let opi = routing_table.get_own_peer_info(msg.header.routing_domain()); let opi = routing_table.get_own_peer_info(msg.header.routing_domain());

View File

@ -2,10 +2,7 @@ use super::*;
impl RPCProcessor { impl RPCProcessor {
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))] #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))]
pub(crate) async fn process_watch_value_q( pub(crate) async fn process_watch_value_q(&self, msg: RPCMessage) -> RPCNetworkResult<()> {
&self,
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
// Ignore if disabled // Ignore if disabled
let routing_table = self.routing_table(); let routing_table = self.routing_table();
let opi = routing_table.get_own_peer_info(msg.header.routing_domain()); let opi = routing_table.get_own_peer_info(msg.header.routing_domain());

View File

@ -54,7 +54,8 @@ impl StorageManager {
let context = context.clone(); let context = context.clone();
let last_descriptor = last_subkey_result.descriptor.clone(); let last_descriptor = last_subkey_result.descriptor.clone();
async move { async move {
let vres = rpc_processor let gva = network_result_try!(
rpc_processor
.clone() .clone()
.rpc_call_get_value( .rpc_call_get_value(
Destination::direct(next_node.clone()).with_safety(safety_selection), Destination::direct(next_node.clone()).with_safety(safety_selection),
@ -62,11 +63,8 @@ impl StorageManager {
subkey, subkey,
last_descriptor, last_descriptor,
) )
.await?; .await?
let gva = network_result_value_or_log!(vres => [ format!(": next_node={} safety_selection={:?} key={} subkey={}", next_node, safety_selection, key, subkey) ] { );
// Any other failures, just try the next node
return Ok(None);
});
// Keep the descriptor if we got one. If we had a last_descriptor it will // Keep the descriptor if we got one. If we had a last_descriptor it will
// already be validated by rpc_call_get_value // already be validated by rpc_call_get_value
@ -87,8 +85,9 @@ impl StorageManager {
let (Some(descriptor), Some(schema)) = (&ctx.descriptor, &ctx.schema) else { let (Some(descriptor), Some(schema)) = (&ctx.descriptor, &ctx.schema) else {
// Got a value but no descriptor for it // Got a value but no descriptor for it
// Move to the next node // Move to the next node
log_stor!(debug "Got value with no descriptor"); return Ok(NetworkResult::invalid_message(
return Ok(None); "Got value with no descriptor",
));
}; };
// Validate with schema // Validate with schema
@ -99,8 +98,10 @@ impl StorageManager {
) { ) {
// Validation failed, ignore this value // Validation failed, ignore this value
// Move to the next node // Move to the next node
log_stor!(debug "Schema validation failed on subkey {}", subkey); return Ok(NetworkResult::invalid_message(format!(
return Ok(None); "Schema validation failed on subkey {}",
subkey
)));
} }
// If we have a prior value, see if this is a newer sequence number // If we have a prior value, see if this is a newer sequence number
@ -112,7 +113,7 @@ impl StorageManager {
// If sequence number is the same, the data should be the same // If sequence number is the same, the data should be the same
if prior_value.value_data() != value.value_data() { if prior_value.value_data() != value.value_data() {
// Move to the next node // Move to the next node
return Ok(None); return Ok(NetworkResult::invalid_message("value data mismatch"));
} }
// Increase the consensus count for the existing value // Increase the consensus count for the existing value
ctx.value_count += 1; ctx.value_count += 1;
@ -136,7 +137,7 @@ impl StorageManager {
#[cfg(feature = "network-result-extra")] #[cfg(feature = "network-result-extra")]
log_stor!(debug "GetValue fanout call returned peers {}", gva.answer.peers.len()); log_stor!(debug "GetValue fanout call returned peers {}", gva.answer.peers.len());
Ok(Some(gva.answer.peers)) Ok(NetworkResult::value(gva.answer.peers))
} }
}; };

View File

@ -209,8 +209,7 @@ impl StorageManager {
// Get rpc processor and drop mutex so we don't block while getting the value from the network // Get rpc processor and drop mutex so we don't block while getting the value from the network
let Some(rpc_processor) = inner.rpc_processor.clone() else { let Some(rpc_processor) = inner.rpc_processor.clone() else {
// Offline, try again later apibail_try_again!("offline, try again later");
apibail_try_again!();
}; };
// Drop the mutex so we dont block during network access // Drop the mutex so we dont block during network access
@ -310,8 +309,7 @@ impl StorageManager {
// Get rpc processor and drop mutex so we don't block while getting the value from the network // Get rpc processor and drop mutex so we don't block while getting the value from the network
let Some(rpc_processor) = inner.rpc_processor.clone() else { let Some(rpc_processor) = inner.rpc_processor.clone() else {
// Offline, try again later apibail_try_again!("offline, try again later");
apibail_try_again!();
}; };
// Drop the lock for network access // Drop the lock for network access

View File

@ -337,7 +337,7 @@ where
.unwrap(); .unwrap();
if !self.total_storage_space.check_limit() { if !self.total_storage_space.check_limit() {
self.total_storage_space.rollback(); self.total_storage_space.rollback();
apibail_try_again!(); apibail_try_again!("out of storage space");
} }
// Save to record table // Save to record table
@ -650,7 +650,7 @@ where
.add(new_subkey_size as u64) .add(new_subkey_size as u64)
.unwrap(); .unwrap();
if !self.total_storage_space.check_limit() { if !self.total_storage_space.check_limit() {
apibail_try_again!(); apibail_try_again!("out of storage space");
} }
// Write subkey // Write subkey

View File

@ -60,7 +60,8 @@ impl StorageManager {
}; };
// send across the wire // send across the wire
let vres = rpc_processor let sva = network_result_try!(
rpc_processor
.clone() .clone()
.rpc_call_set_value( .rpc_call_set_value(
Destination::direct(next_node.clone()).with_safety(safety_selection), Destination::direct(next_node.clone()).with_safety(safety_selection),
@ -70,11 +71,8 @@ impl StorageManager {
descriptor.clone(), descriptor.clone(),
send_descriptor, send_descriptor,
) )
.await?; .await?
let sva = network_result_value_or_log!(vres => [ format!(": next_node={} safety_selection={:?} key={} subkey={} send_descriptor={}", next_node, safety_selection, key, subkey, send_descriptor) ] { );
// Any other failures, just try the next node and pretend this one never happened
return Ok(None);
});
// If the node was close enough to possibly set the value // If the node was close enough to possibly set the value
if sva.answer.set { if sva.answer.set {
@ -91,7 +89,7 @@ impl StorageManager {
value.value_data(), value.value_data(),
) { ) {
// Validation failed, ignore this value and pretend we never saw this node // Validation failed, ignore this value and pretend we never saw this node
return Ok(None); return Ok(NetworkResult::invalid_message("Schema validation failed"));
} }
// We have a prior value, ensure this is a newer sequence number // We have a prior value, ensure this is a newer sequence number
@ -107,7 +105,7 @@ impl StorageManager {
// If the sequence number is older, or an equal sequence number, // If the sequence number is older, or an equal sequence number,
// node should have not returned a value here. // node should have not returned a value here.
// Skip this node and its closer list because it is misbehaving // Skip this node and its closer list because it is misbehaving
return Ok(None); return Ok(NetworkResult::invalid_message("Sequence number is older"));
} }
} else { } else {
// It was set on this node and no newer value was found and returned, // It was set on this node and no newer value was found and returned,
@ -124,7 +122,7 @@ impl StorageManager {
#[cfg(feature = "network-result-extra")] #[cfg(feature = "network-result-extra")]
log_stor!(debug "SetValue fanout call returned peers {}", sva.answer.peers.len()); log_stor!(debug "SetValue fanout call returned peers {}", sva.answer.peers.len());
Ok(Some(sva.answer.peers)) Ok(NetworkResult::value(sva.answer.peers))
} }
}; };

View File

@ -200,9 +200,11 @@ impl VeilidAPI {
/// `VLD0:XmnGyJrjMJBRC5ayJZRPXWTBspdX36-pbLb98H3UMeE` but if the prefix is left off /// `VLD0:XmnGyJrjMJBRC5ayJZRPXWTBspdX36-pbLb98H3UMeE` but if the prefix is left off
/// `XmnGyJrjMJBRC5ayJZRPXWTBspdX36-pbLb98H3UMeE` will be parsed with the 'best' cryptosystem /// `XmnGyJrjMJBRC5ayJZRPXWTBspdX36-pbLb98H3UMeE` will be parsed with the 'best' cryptosystem
/// available (at the time of this writing this is `VLD0`) /// available (at the time of this writing this is `VLD0`)
pub async fn parse_as_target<S: AsRef<str>>(&self, s: S) -> VeilidAPIResult<Target> { pub async fn parse_as_target<S: ToString>(&self, s: S) -> VeilidAPIResult<Target> {
let s = s.to_string();
// Is this a route id? // Is this a route id?
if let Ok(rrid) = RouteId::from_str(s.as_ref()) { if let Ok(rrid) = RouteId::from_str(&s) {
let routing_table = self.routing_table()?; let routing_table = self.routing_table()?;
let rss = routing_table.route_spec_store(); let rss = routing_table.route_spec_store();
@ -213,11 +215,11 @@ impl VeilidAPI {
} }
// Is this a node id? // Is this a node id?
if let Ok(nid) = TypedKey::from_str(s.as_ref()) { if let Ok(nid) = TypedKey::from_str(&s) {
return Ok(Target::NodeId(nid)); return Ok(Target::NodeId(nid));
} }
Err(VeilidAPIError::invalid_target()) Err(VeilidAPIError::parse_error("Unable to parse as target", s))
} }
//////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////
@ -261,40 +263,28 @@ impl VeilidAPI {
}; };
let rss = self.routing_table()?.route_spec_store(); let rss = self.routing_table()?.route_spec_store();
let r = rss let route_id = rss.allocate_route(
.allocate_route(
crypto_kinds, crypto_kinds,
stability, stability,
sequencing, sequencing,
default_route_hop_count, default_route_hop_count,
Direction::Inbound.into(), Direction::Inbound.into(),
&[], &[],
) )?;
.map_err(VeilidAPIError::internal)?; if !rss.test_route(route_id).await? {
let Some(route_id) = r else {
apibail_generic!("unable to allocate route");
};
if !rss
.test_route(route_id)
.await
.map_err(VeilidAPIError::no_connection)?
{
rss.release_route(route_id); rss.release_route(route_id);
apibail_generic!("allocated route failed to test"); apibail_generic!("allocated route failed to test");
} }
let private_routes = rss let private_routes = rss.assemble_private_routes(&route_id, Some(true))?;
.assemble_private_routes(&route_id, Some(true))
.map_err(VeilidAPIError::generic)?;
let blob = match RouteSpecStore::private_routes_to_blob(&private_routes) { let blob = match RouteSpecStore::private_routes_to_blob(&private_routes) {
Ok(v) => v, Ok(v) => v,
Err(e) => { Err(e) => {
rss.release_route(route_id); rss.release_route(route_id);
apibail_internal!(e); return Err(e);
} }
}; };
rss.mark_route_published(&route_id, true) rss.mark_route_published(&route_id, true)?;
.map_err(VeilidAPIError::internal)?;
Ok((route_id, blob)) Ok((route_id, blob))
} }
@ -305,7 +295,6 @@ impl VeilidAPI {
pub fn import_remote_private_route(&self, blob: Vec<u8>) -> VeilidAPIResult<RouteId> { pub fn import_remote_private_route(&self, blob: Vec<u8>) -> VeilidAPIResult<RouteId> {
let rss = self.routing_table()?.route_spec_store(); let rss = self.routing_table()?.route_spec_store();
rss.import_remote_private_route(blob) rss.import_remote_private_route(blob)
.map_err(|e| VeilidAPIError::invalid_argument(e, "blob", "private route blob"))
} }
/// Release either a locally allocated or remotely imported private route /// Release either a locally allocated or remotely imported private route

View File

@ -1096,8 +1096,7 @@ impl VeilidAPI {
directions, directions,
&[], &[],
) { ) {
Ok(Some(v)) => format!("{}", v), Ok(v) => v.to_string(),
Ok(None) => "<unavailable>".to_string(),
Err(e) => { Err(e) => {
format!("Route allocation failed: {}", e) format!("Route allocation failed: {}", e)
} }

View File

@ -19,8 +19,8 @@ macro_rules! apibail_timeout {
#[allow(unused_macros)] #[allow(unused_macros)]
#[macro_export] #[macro_export]
macro_rules! apibail_try_again { macro_rules! apibail_try_again {
() => { ($x:expr) => {
return Err(VeilidAPIError::try_again()) return Err(VeilidAPIError::try_again($x))
}; };
} }
@ -83,8 +83,8 @@ macro_rules! apibail_key_not_found {
#[allow(unused_macros)] #[allow(unused_macros)]
#[macro_export] #[macro_export]
macro_rules! apibail_invalid_target { macro_rules! apibail_invalid_target {
() => { ($x:expr) => {
return Err(VeilidAPIError::invalid_target()) return Err(VeilidAPIError::invalid_target($x))
}; };
} }
@ -116,12 +116,12 @@ pub enum VeilidAPIError {
AlreadyInitialized, AlreadyInitialized,
#[error("Timeout")] #[error("Timeout")]
Timeout, Timeout,
#[error("TryAgain")] #[error("TryAgain: {message}")]
TryAgain, TryAgain { message: String },
#[error("Shutdown")] #[error("Shutdown")]
Shutdown, Shutdown,
#[error("Invalid target")] #[error("Invalid target: {message}")]
InvalidTarget, InvalidTarget { message: String },
#[error("No connection: {message}")] #[error("No connection: {message}")]
NoConnection { message: String }, NoConnection { message: String },
#[error("Key not found: {key}")] #[error("Key not found: {key}")]
@ -158,14 +158,18 @@ impl VeilidAPIError {
pub fn timeout() -> Self { pub fn timeout() -> Self {
Self::Timeout Self::Timeout
} }
pub fn try_again() -> Self { pub fn try_again<T: ToString>(msg: T) -> Self {
Self::TryAgain Self::TryAgain {
message: msg.to_string(),
}
} }
pub fn shutdown() -> Self { pub fn shutdown() -> Self {
Self::Shutdown Self::Shutdown
} }
pub fn invalid_target() -> Self { pub fn invalid_target<T: ToString>(msg: T) -> Self {
Self::InvalidTarget Self::InvalidTarget {
message: msg.to_string(),
}
} }
pub fn no_connection<T: ToString>(msg: T) -> Self { pub fn no_connection<T: ToString>(msg: T) -> Self {
Self::NoConnection { Self::NoConnection {
@ -213,6 +217,21 @@ impl VeilidAPIError {
message: msg.to_string(), message: msg.to_string(),
} }
} }
pub(crate) fn from_network_result<T>(nr: NetworkResult<T>) -> Result<T, Self> {
match nr {
NetworkResult::Timeout => Err(VeilidAPIError::timeout()),
NetworkResult::ServiceUnavailable(m) => Err(VeilidAPIError::invalid_target(m)),
NetworkResult::NoConnection(m) => Err(VeilidAPIError::no_connection(m)),
NetworkResult::AlreadyExists(m) => {
Err(VeilidAPIError::generic(format!("Already exists: {}", m)))
}
NetworkResult::InvalidMessage(m) => {
Err(VeilidAPIError::parse_error("Invalid message", m))
}
NetworkResult::Value(v) => Ok(v),
}
}
} }
pub type VeilidAPIResult<T> = Result<T, VeilidAPIError>; pub type VeilidAPIResult<T> = Result<T, VeilidAPIError>;

View File

@ -220,7 +220,7 @@ impl JsonRequestProcessor {
return Ok(Target::NodeId(nid)); return Ok(Target::NodeId(nid));
} }
Err(VeilidAPIError::invalid_target()) Err(VeilidAPIError::parse_error("Unable to parse as target", s))
} }
////////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////////

View File

@ -126,7 +126,7 @@ impl RoutingContext {
.await .await
{ {
Ok(Some(nr)) => nr, Ok(Some(nr)) => nr,
Ok(None) => apibail_invalid_target!(), Ok(None) => apibail_invalid_target!("could not resolve node id"),
Err(e) => return Err(e.into()), Err(e) => return Err(e.into()),
}; };
// Apply sequencing to match safety selection // Apply sequencing to match safety selection
@ -142,7 +142,7 @@ impl RoutingContext {
let rss = self.api.routing_table()?.route_spec_store(); let rss = self.api.routing_table()?.route_spec_store();
let Some(private_route) = rss.best_remote_private_route(&rsid) else { let Some(private_route) = rss.best_remote_private_route(&rsid) else {
apibail_invalid_target!(); apibail_invalid_target!("could not get remote private route");
}; };
Ok(rpc_processor::Destination::PrivateRoute { Ok(rpc_processor::Destination::PrivateRoute {
@ -174,10 +174,7 @@ impl RoutingContext {
let answer = match rpc_processor.rpc_call_app_call(dest, message).await { let answer = match rpc_processor.rpc_call_app_call(dest, message).await {
Ok(NetworkResult::Value(v)) => v, Ok(NetworkResult::Value(v)) => v,
Ok(NetworkResult::Timeout) => apibail_timeout!(), Ok(NetworkResult::Timeout) => apibail_timeout!(),
Ok(NetworkResult::ServiceUnavailable(e)) => { Ok(NetworkResult::ServiceUnavailable(e)) => apibail_invalid_target!(e),
log_network_result!(format!("app_call: ServiceUnavailable({})", e));
apibail_try_again!()
}
Ok(NetworkResult::NoConnection(e)) | Ok(NetworkResult::AlreadyExists(e)) => { Ok(NetworkResult::NoConnection(e)) | Ok(NetworkResult::AlreadyExists(e)) => {
apibail_no_connection!(e); apibail_no_connection!(e);
} }
@ -207,10 +204,7 @@ impl RoutingContext {
match rpc_processor.rpc_call_app_message(dest, message).await { match rpc_processor.rpc_call_app_message(dest, message).await {
Ok(NetworkResult::Value(())) => {} Ok(NetworkResult::Value(())) => {}
Ok(NetworkResult::Timeout) => apibail_timeout!(), Ok(NetworkResult::Timeout) => apibail_timeout!(),
Ok(NetworkResult::ServiceUnavailable(e)) => { Ok(NetworkResult::ServiceUnavailable(e)) => apibail_invalid_target!(e),
log_network_result!(format!("app_message: ServiceUnavailable({})", e));
apibail_try_again!()
}
Ok(NetworkResult::NoConnection(e)) | Ok(NetworkResult::AlreadyExists(e)) => { Ok(NetworkResult::NoConnection(e)) | Ok(NetworkResult::AlreadyExists(e)) => {
apibail_no_connection!(e); apibail_no_connection!(e);
} }

View File

@ -403,7 +403,7 @@ packages:
path: ".." path: ".."
relative: true relative: true
source: path source: path
version: "0.2.3" version: "0.2.4"
web: web:
dependency: transitive dependency: transitive
description: description:

View File

@ -22,7 +22,7 @@ abstract class VeilidAPIException implements Exception {
} }
case 'TryAgain': case 'TryAgain':
{ {
return VeilidAPIExceptionTryAgain(); return VeilidAPIExceptionTryAgain(json['message']! as String);
} }
case 'Shutdown': case 'Shutdown':
{ {
@ -30,7 +30,7 @@ abstract class VeilidAPIException implements Exception {
} }
case 'InvalidTarget': case 'InvalidTarget':
{ {
return VeilidAPIExceptionInvalidTarget(); return VeilidAPIExceptionInvalidTarget(json['message']! as String);
} }
case 'NoConnection': case 'NoConnection':
{ {
@ -108,11 +108,14 @@ class VeilidAPIExceptionTimeout implements VeilidAPIException {
@immutable @immutable
class VeilidAPIExceptionTryAgain implements VeilidAPIException { class VeilidAPIExceptionTryAgain implements VeilidAPIException {
//
const VeilidAPIExceptionTryAgain(this.message);
final String message;
@override @override
String toString() => 'VeilidAPIException: TryAgain'; String toString() => 'VeilidAPIException: TryAgain (message: $message)';
@override @override
String toDisplayError() => 'Try again'; String toDisplayError() => 'Try again: (message: $message)';
} }
@immutable @immutable
@ -126,11 +129,15 @@ class VeilidAPIExceptionShutdown implements VeilidAPIException {
@immutable @immutable
class VeilidAPIExceptionInvalidTarget implements VeilidAPIException { class VeilidAPIExceptionInvalidTarget implements VeilidAPIException {
@override //
String toString() => 'VeilidAPIException: InvalidTarget'; const VeilidAPIExceptionInvalidTarget(this.message);
final String message;
@override @override
String toDisplayError() => 'Invalid target'; String toString() => 'VeilidAPIException: InvalidTarget (message: $message)';
@override
String toDisplayError() => 'Invalid target: (message: $message)';
} }
@immutable @immutable

View File

@ -26,20 +26,21 @@ rt-tokio = [
"tokio-util", "tokio-util",
"opentelemetry/rt-tokio", "opentelemetry/rt-tokio",
] ]
debug-load = ["dep:ctor", "dep:libc-print", "dep:android_log-sys", "dep:oslog"]
[dependencies] [dependencies]
veilid-core = { path = "../../veilid-core", default-features = false } veilid-core = { path = "../../veilid-core", default-features = false }
tracing = { version = "^0", features = ["log", "attributes"] } tracing = { version = "0.1.37", features = ["log", "attributes"] }
tracing-subscriber = "^0" tracing-subscriber = "0.3.17"
parking_lot = "^0" parking_lot = "0.12.1"
backtrace = "^0" backtrace = "0.3.69"
serde_json = "^1" serde_json = "1.0.107"
serde = "^1" serde = "1.0.188"
futures-util = { version = "^0", default-features = false, features = [ futures-util = { version = "0.3.28", default-features = false, features = [
"alloc", "alloc",
] } ] }
cfg-if = "^1" cfg-if = "1.0.0"
data-encoding = { version = "^2" } data-encoding = { version = "2.4.0" }
# Dependencies for native builds only # Dependencies for native builds only
# Linux, Windows, Mac, iOS, Android # Linux, Windows, Mac, iOS, Android
@ -48,19 +49,27 @@ tracing-opentelemetry = "0.21"
opentelemetry = { version = "0.20" } opentelemetry = { version = "0.20" }
opentelemetry-otlp = { version = "0.13" } opentelemetry-otlp = { version = "0.13" }
opentelemetry-semantic-conventions = "0.12" opentelemetry-semantic-conventions = "0.12"
async-std = { version = "^1", features = ["unstable"], optional = true } async-std = { version = "1.12.0", features = ["unstable"], optional = true }
tokio = { version = "^1", features = ["full"], optional = true } tokio = { version = "1.32.0", features = ["full"], optional = true }
tokio-stream = { version = "^0", features = ["net"], optional = true } tokio-stream = { version = "0.1.14", features = ["net"], optional = true }
tokio-util = { version = "^0", features = ["compat"], optional = true } tokio-util = { version = "0.7.8", features = ["compat"], optional = true }
allo-isolate = "^0" allo-isolate = "0.1.20"
ffi-support = "^0" ffi-support = "0.4.4"
lazy_static = "^1" lazy_static = "1.4.0"
hostname = "^0" hostname = "0.3.1"
ctor = { version = "0.2.5", optional = true }
libc-print = { version = "0.1.22", optional = true }
# Dependencies for WASM builds only # Dependencies for WASM builds only
[target.'cfg(target_arch = "wasm32")'.dependencies] [target.'cfg(target_arch = "wasm32")'.dependencies]
# Dependencies for Android builds only # Dependencies for Android builds only
[target.'cfg(target_os = "android")'.dependencies] [target.'cfg(target_os = "android")'.dependencies]
jni = "^0" jni = "0.21.1"
paranoid-android = "0.2.1" paranoid-android = "0.2.1"
android_log-sys = { version = "0.3.1", optional = true }
# Dependencies for Android builds only
[target.'cfg(target_os = "ios")'.dependencies]
oslog = { version = "0.2.0", default-features = false, optional = true }

File diff suppressed because it is too large Load Diff

View File

@ -105,7 +105,9 @@ async def test_routing_context_app_call_loopback():
await api.debug("purge routes") await api.debug("purge routes")
# make a routing context that uses a safety route # make a routing context that uses a safety route
rc = await (await api.new_routing_context()).with_privacy() rc = await (await (await api.new_routing_context()).with_privacy()).with_sequencing(
veilid.Sequencing.ENSURE_ORDERED
)
async with rc: async with rc:
# make a new local private route # make a new local private route
prl, blob = await api.new_private_route() prl, blob = await api.new_private_route()

View File

@ -55,6 +55,7 @@ class VeilidAPIErrorTryAgain(VeilidAPIError):
"""Operation could not be performed at this time, retry again later""" """Operation could not be performed at this time, retry again later"""
label = "Try again" label = "Try again"
message: str
@dataclass @dataclass
@ -69,6 +70,7 @@ class VeilidAPIErrorInvalidTarget(VeilidAPIError):
"""Target of operation is not valid""" """Target of operation is not valid"""
label = "Invalid target" label = "Invalid target"
message: str
@dataclass @dataclass

View File

@ -2974,7 +2974,8 @@
{ {
"type": "object", "type": "object",
"required": [ "required": [
"kind" "kind",
"message"
], ],
"properties": { "properties": {
"kind": { "kind": {
@ -2982,6 +2983,9 @@
"enum": [ "enum": [
"TryAgain" "TryAgain"
] ]
},
"message": {
"type": "string"
} }
} }
}, },
@ -3002,7 +3006,8 @@
{ {
"type": "object", "type": "object",
"required": [ "required": [
"kind" "kind",
"message"
], ],
"properties": { "properties": {
"kind": { "kind": {
@ -3010,6 +3015,9 @@
"enum": [ "enum": [
"InvalidTarget" "InvalidTarget"
] ]
},
"message": {
"type": "string"
} }
} }
}, },

View File

@ -413,7 +413,7 @@ impl AssemblyBuffer {
.await; .await;
// Get a message seq // Get a message seq
let seq = self.unlocked_inner.next_seq.fetch_add(1, Ordering::Relaxed); let seq = self.unlocked_inner.next_seq.fetch_add(1, Ordering::AcqRel);
// Chunk it up // Chunk it up
let mut offset = 0usize; let mut offset = 0usize;

View File

@ -218,7 +218,7 @@ impl<T> NetworkResult<T> {
Self::Value(v) => NetworkResult::<X>::Value(f(v)), Self::Value(v) => NetworkResult::<X>::Value(f(v)),
} }
} }
pub fn into_result(self) -> Result<T, io::Error> { pub fn into_io_result(self) -> Result<T, io::Error> {
match self { match self {
Self::Timeout => Err(io::Error::new(io::ErrorKind::TimedOut, "Timed out")), Self::Timeout => Err(io::Error::new(io::ErrorKind::TimedOut, "Timed out")),
Self::ServiceUnavailable(s) => Err(io::Error::new( Self::ServiceUnavailable(s) => Err(io::Error::new(

View File

@ -52,7 +52,7 @@ pub async fn test_single_out_in() {
// Send to input // Send to input
let r_message = assbuf_in let r_message = assbuf_in
.insert_frame(&frame, r_remote_addr) .insert_frame(&frame, r_remote_addr)
.into_result() .into_io_result()
.expect("should get a value") .expect("should get a value")
.expect("should get something out"); .expect("should get something out");
@ -114,7 +114,7 @@ pub async fn test_one_frag_out_in() {
// Send to input // Send to input
let r_message = assbuf_in let r_message = assbuf_in
.insert_frame(&frame, r_remote_addr) .insert_frame(&frame, r_remote_addr)
.into_result() .into_io_result()
.expect("should get a value"); .expect("should get a value");
// We should have gotten the same message // We should have gotten the same message
@ -179,7 +179,7 @@ pub async fn test_many_frags_out_in() {
// Send to input // Send to input
let r_message = assbuf_in let r_message = assbuf_in
.insert_frame(&frame, r_remote_addr) .insert_frame(&frame, r_remote_addr)
.into_result() .into_io_result()
.expect("should get a value"); .expect("should get a value");
// We should have gotten the same message // We should have gotten the same message
@ -244,7 +244,7 @@ pub async fn test_many_frags_out_in_single_host() {
// Send to input // Send to input
let r_message = assbuf_in let r_message = assbuf_in
.insert_frame(&frame, r_remote_addr) .insert_frame(&frame, r_remote_addr)
.into_result() .into_io_result()
.expect("should get a value"); .expect("should get a value");
// We should have gotten the same message // We should have gotten the same message
@ -271,7 +271,7 @@ pub async fn test_many_frags_with_drops() {
let first = first.clone(); let first = first.clone();
async move { async move {
// Send only first packet, drop rest // Send only first packet, drop rest
if first.swap(false, Ordering::Relaxed) { if first.swap(false, Ordering::AcqRel) {
net_tx net_tx
.send_async((framed_chunk, remote_addr)) .send_async((framed_chunk, remote_addr))
.await .await
@ -306,7 +306,7 @@ pub async fn test_many_frags_with_drops() {
Ok(NetworkResult::Value(())) Ok(NetworkResult::Value(()))
)); ));
first.store(true, Ordering::Relaxed); first.store(true, Ordering::Release);
} }
println!("all_sent len={}", all_sent.len()); println!("all_sent len={}", all_sent.len());
@ -322,7 +322,7 @@ pub async fn test_many_frags_with_drops() {
// Send to input // Send to input
let r_message = assbuf_in let r_message = assbuf_in
.insert_frame(&frame, r_remote_addr) .insert_frame(&frame, r_remote_addr)
.into_result() .into_io_result()
.expect("should get a value"); .expect("should get a value");
// We should have gotten the same message // We should have gotten the same message
@ -399,7 +399,7 @@ pub async fn test_many_frags_reordered() {
// Send to input // Send to input
let r_message = assbuf_in let r_message = assbuf_in
.insert_frame(&frame, r_remote_addr) .insert_frame(&frame, r_remote_addr)
.into_result() .into_io_result()
.expect("should get a value"); .expect("should get a value");
// We should have gotten the same message // We should have gotten the same message

View File

@ -58,7 +58,7 @@ impl<E: Send + 'static> TickTask<E> {
} }
pub fn is_running(&self) -> bool { pub fn is_running(&self) -> bool {
self.running.load(core::sync::atomic::Ordering::Relaxed) self.running.load(core::sync::atomic::Ordering::Acquire)
} }
pub async fn stop(&self) -> Result<(), E> { pub async fn stop(&self) -> Result<(), E> {
@ -120,9 +120,9 @@ impl<E: Send + 'static> TickTask<E> {
let running = self.running.clone(); let running = self.running.clone();
let routine = self.routine.get().unwrap()(stop_token, last_timestamp_us, now); let routine = self.routine.get().unwrap()(stop_token, last_timestamp_us, now);
let wrapped_routine = Box::pin(async move { let wrapped_routine = Box::pin(async move {
running.store(true, core::sync::atomic::Ordering::Relaxed); running.store(true, core::sync::atomic::Ordering::Release);
let out = routine.await; let out = routine.await;
running.store(false, core::sync::atomic::Ordering::Relaxed); running.store(false, core::sync::atomic::Ordering::Release);
out out
}); });
match self.single_future.single_spawn(wrapped_routine).await { match self.single_future.single_spawn(wrapped_routine).await {

View File

@ -18,21 +18,21 @@ extern "C" {
pub fn is_browser() -> bool { pub fn is_browser() -> bool {
static CACHE: AtomicI8 = AtomicI8::new(-1); static CACHE: AtomicI8 = AtomicI8::new(-1);
let cache = CACHE.load(Ordering::Relaxed); let cache = CACHE.load(Ordering::Acquire);
if cache != -1 { if cache != -1 {
return cache != 0; return cache != 0;
} }
let res = Reflect::has(global().as_ref(), &"navigator".into()).unwrap_or_default(); let res = Reflect::has(global().as_ref(), &"navigator".into()).unwrap_or_default();
CACHE.store(res as i8, Ordering::Relaxed); CACHE.store(res as i8, Ordering::Release);
res res
} }
pub fn is_browser_https() -> bool { pub fn is_browser_https() -> bool {
static CACHE: AtomicI8 = AtomicI8::new(-1); static CACHE: AtomicI8 = AtomicI8::new(-1);
let cache = CACHE.load(Ordering::Relaxed); let cache = CACHE.load(Ordering::Acquire);
if cache != -1 { if cache != -1 {
return cache != 0; return cache != 0;
} }
@ -41,7 +41,7 @@ pub fn is_browser_https() -> bool {
.map(|res| res.is_truthy()) .map(|res| res.is_truthy())
.unwrap_or_default(); .unwrap_or_default();
CACHE.store(res as i8, Ordering::Relaxed); CACHE.store(res as i8, Ordering::Release);
res res
} }

View File

@ -191,7 +191,7 @@ pub fn initialize_veilid_wasm() {
static INITIALIZED: AtomicBool = AtomicBool::new(false); static INITIALIZED: AtomicBool = AtomicBool::new(false);
#[wasm_bindgen()] #[wasm_bindgen()]
pub fn initialize_veilid_core(platform_config: String) { pub fn initialize_veilid_core(platform_config: String) {
if INITIALIZED.swap(true, Ordering::Relaxed) { if INITIALIZED.swap(true, Ordering::AcqRel) {
return; return;
} }
let platform_config: VeilidWASMConfig = veilid_core::deserialize_json(&platform_config) let platform_config: VeilidWASMConfig = veilid_core::deserialize_json(&platform_config)

View File

@ -30,7 +30,7 @@ pub struct VeilidClient {}
#[wasm_bindgen(js_class = veilidClient)] #[wasm_bindgen(js_class = veilidClient)]
impl VeilidClient { impl VeilidClient {
pub async fn initializeCore(platformConfig: VeilidWASMConfig) { pub async fn initializeCore(platformConfig: VeilidWASMConfig) {
if INITIALIZED.swap(true, Ordering::Relaxed) { if INITIALIZED.swap(true, Ordering::AcqRel) {
return; return;
} }
console_error_panic_hook::set_once(); console_error_panic_hook::set_once();