This commit is contained in:
John Smith 2022-11-05 18:50:20 -04:00
parent b2dd3bf9b7
commit cd892d077a
4 changed files with 258 additions and 133 deletions

View File

@ -699,7 +699,11 @@ impl RouteSpecStore {
} }
} }
// Remove from end nodes cache // Remove from end nodes cache
match inner.cache.used_nodes.entry(*detail.hops.last().unwrap()) { match inner
.cache
.used_end_nodes
.entry(*detail.hops.last().unwrap())
{
std::collections::hash_map::Entry::Occupied(mut o) => { std::collections::hash_map::Entry::Occupied(mut o) => {
*o.get_mut() -= 1; *o.get_mut() -= 1;
if *o.get() == 0 { if *o.get() == 0 {
@ -707,7 +711,7 @@ impl RouteSpecStore {
} }
} }
std::collections::hash_map::Entry::Vacant(_) => { std::collections::hash_map::Entry::Vacant(_) => {
panic!("used_nodes cache should have contained hop"); panic!("used_end_nodes cache should have contained hop");
} }
} }
} else { } else {
@ -1161,12 +1165,24 @@ impl RouteSpecStore {
let mut pr_builder = pr_message.init_root::<veilid_capnp::private_route::Builder>(); let mut pr_builder = pr_message.init_root::<veilid_capnp::private_route::Builder>();
encode_private_route(&private_route, &mut pr_builder) encode_private_route(&private_route, &mut pr_builder)
.wrap_err("failed to encode private route")?; .wrap_err("failed to encode private route")?;
builder_to_vec(pr_message).wrap_err("failed to convert builder to vec")
let mut buffer = vec![];
capnp::serialize_packed::write_message(&mut buffer, &pr_message)
.wrap_err("failed to convert builder to vec")?;
Ok(buffer)
// builder_to_vec(pr_message).wrap_err("failed to convert builder to vec")
} }
/// Convert binary blob to private route /// Convert binary blob to private route
pub fn blob_to_private_route(blob: Vec<u8>) -> EyreResult<PrivateRoute> { pub fn blob_to_private_route(blob: Vec<u8>) -> EyreResult<PrivateRoute> {
let reader = ::capnp::message::Reader::new(RPCMessageData::new(blob), Default::default()); let reader = capnp::serialize_packed::read_message(
blob.as_slice(),
capnp::message::ReaderOptions::new(),
)
.wrap_err("failed to make message reader")?;
//let reader = ::capnp::message::Reader::new(RPCMessageData::new(blob), Default::default());
let pr_reader = reader let pr_reader = reader
.get_root::<veilid_capnp::private_route::Reader>() .get_root::<veilid_capnp::private_route::Reader>()
.wrap_err("failed to make reader for private_route")?; .wrap_err("failed to make reader for private_route")?;

View File

@ -30,7 +30,6 @@ pub use rpc_status::*;
use super::*; use super::*;
use crate::crypto::*; use crate::crypto::*;
use crate::xx::*; use crate::xx::*;
use capnp::message::ReaderSegments;
use futures_util::StreamExt; use futures_util::StreamExt;
use network_manager::*; use network_manager::*;
use receipt_manager::*; use receipt_manager::*;
@ -98,17 +97,27 @@ impl RPCMessageData {
pub fn new(contents: Vec<u8>) -> Self { pub fn new(contents: Vec<u8>) -> Self {
Self { contents } Self { contents }
} }
pub fn get_reader(
&self,
) -> Result<capnp::message::Reader<capnp::serialize::OwnedSegments>, RPCError> {
capnp::serialize_packed::read_message(
self.contents.as_slice(),
capnp::message::ReaderOptions::new(),
)
.map_err(RPCError::protocol)
}
} }
impl ReaderSegments for RPCMessageData { // impl ReaderSegments for RPCMessageData {
fn get_segment(&self, idx: u32) -> Option<&[u8]> { // fn get_segment(&self, idx: u32) -> Option<&[u8]> {
if idx > 0 { // if idx > 0 {
None // None
} else { // } else {
Some(self.contents.as_slice()) // Some(self.contents.as_slice())
} // }
} // }
} // }
#[derive(Debug)] #[derive(Debug)]
struct RPCMessageEncoded { struct RPCMessageEncoded {
@ -127,12 +136,17 @@ pub fn builder_to_vec<'a, T>(builder: capnp::message::Builder<T>) -> Result<Vec<
where where
T: capnp::message::Allocator + 'a, T: capnp::message::Allocator + 'a,
{ {
let wordvec = builder let mut buffer = vec![];
.into_reader() capnp::serialize_packed::write_message(&mut buffer, &builder)
.canonicalize()
.map_err(RPCError::protocol) .map_err(RPCError::protocol)
.map_err(logthru_rpc!())?; .map_err(logthru_rpc!())?;
Ok(capnp::Word::words_to_bytes(wordvec.as_slice()).to_vec()) Ok(buffer)
// let wordvec = builder
// .into_reader()
// .canonicalize()
// .map_err(RPCError::protocol)
// .map_err(logthru_rpc!())?;
// Ok(capnp::Word::words_to_bytes(wordvec.as_slice()).to_vec())
} }
// fn reader_to_vec<'a, T>(reader: &capnp::message::Reader<T>) -> Result<Vec<u8>, RPCError> // fn reader_to_vec<'a, T>(reader: &capnp::message::Reader<T>) -> Result<Vec<u8>, RPCError>
@ -899,7 +913,7 @@ impl RPCProcessor {
// Decode the RPC message // Decode the RPC message
let operation = { let operation = {
let reader = capnp::message::Reader::new(encoded_msg.data, Default::default()); let reader = encoded_msg.data.get_reader()?;
let op_reader = reader let op_reader = reader
.get_root::<veilid_capnp::operation::Reader>() .get_root::<veilid_capnp::operation::Reader>()
.map_err(RPCError::protocol) .map_err(RPCError::protocol)
@ -945,7 +959,7 @@ impl RPCProcessor {
RPCMessageHeaderDetail::SafetyRouted(_) | RPCMessageHeaderDetail::PrivateRouted(_) => { RPCMessageHeaderDetail::SafetyRouted(_) | RPCMessageHeaderDetail::PrivateRouted(_) => {
// Decode the RPC message // Decode the RPC message
let operation = { let operation = {
let reader = capnp::message::Reader::new(encoded_msg.data, Default::default()); let reader = encoded_msg.data.get_reader()?;
let op_reader = reader let op_reader = reader
.get_root::<veilid_capnp::operation::Reader>() .get_root::<veilid_capnp::operation::Reader>()
.map_err(RPCError::protocol) .map_err(RPCError::protocol)

View File

@ -325,15 +325,8 @@ impl RPCProcessor {
.cached_dh(&route.safety_route.public_key, &node_id_secret) .cached_dh(&route.safety_route.public_key, &node_id_secret)
.map_err(RPCError::protocol)?; .map_err(RPCError::protocol)?;
let dec_blob_data = Crypto::decrypt_aead(blob_data, &d.nonce, &dh_secret, None) let dec_blob_data = Crypto::decrypt_aead(blob_data, &d.nonce, &dh_secret, None)
.map_err(RPCError::map_internal( .map_err(RPCError::protocol)?;
"decryption of safety route hop failed", let dec_blob_reader = RPCMessageData::new(dec_blob_data).get_reader()?;
))?;
let dec_blob_reader = capnp::message::Reader::new(
RPCMessageData {
contents: dec_blob_data,
},
Default::default(),
);
// Decode the blob appropriately // Decode the blob appropriately
if blob_tag == 1 { if blob_tag == 1 {
@ -387,15 +380,8 @@ impl RPCProcessor {
.map_err(RPCError::protocol)?; .map_err(RPCError::protocol)?;
let dec_blob_data = let dec_blob_data =
Crypto::decrypt_aead(&next_hop.blob, &next_hop.nonce, &dh_secret, None) Crypto::decrypt_aead(&next_hop.blob, &next_hop.nonce, &dh_secret, None)
.map_err(RPCError::map_internal( .map_err(RPCError::protocol)?;
"decryption of private route hop failed", let dec_blob_reader = RPCMessageData::new(dec_blob_data).get_reader()?;
))?;
let dec_blob_reader = capnp::message::Reader::new(
RPCMessageData {
contents: dec_blob_data,
},
Default::default(),
);
// Decode next RouteHop // Decode next RouteHop
let route_hop = { let route_hop = {

View File

@ -2,9 +2,19 @@
// Debugging // Debugging
use super::*; use super::*;
use data_encoding::BASE64URL_NOPAD;
use routing_table::*; use routing_table::*;
use rpc_processor::*; use rpc_processor::*;
#[derive(Default, Debug)]
struct DebugCache {
imported_routes: Vec<PrivateRoute>,
}
static DEBUG_CACHE: Mutex<DebugCache> = Mutex::new(DebugCache {
imported_routes: Vec::new(),
});
fn get_bucket_entry_state(text: &str) -> Option<BucketEntryState> { fn get_bucket_entry_state(text: &str) -> Option<BucketEntryState> {
if text == "dead" { if text == "dead" {
Some(BucketEntryState::Dead) Some(BucketEntryState::Dead)
@ -44,6 +54,126 @@ fn get_route_id(rss: RouteSpecStore) -> impl Fn(&str) -> Option<DHTKey> {
}; };
} }
fn get_safety_selection(text: &str, rss: RouteSpecStore) -> Option<SafetySelection> {
if text.len() == 0 {
return None;
}
if &text[0..1] == "-" {
// Unsafe
let text = &text[1..];
let seq = get_sequencing(text).unwrap_or(Sequencing::NoPreference);
Some(SafetySelection::Unsafe(seq))
} else {
// Safe
let mut preferred_route = None;
let mut hop_count = 2;
let mut stability = Stability::LowLatency;
let mut sequencing = Sequencing::NoPreference;
for x in text.split(",") {
let x = x.trim();
if let Some(pr) = get_route_id(rss.clone())(x) {
preferred_route = Some(pr)
}
if let Some(n) = get_number(x) {
hop_count = n;
}
if let Some(s) = get_stability(x) {
stability = s;
}
if let Some(s) = get_sequencing(x) {
sequencing = s;
}
}
let ss = SafetySpec {
preferred_route,
hop_count,
stability,
sequencing,
};
Some(SafetySelection::Safe(ss))
}
}
fn get_node_ref_modifiers(mut node_ref: NodeRef) -> impl FnOnce(&str) -> Option<NodeRef> {
move |text| {
for m in text.split("/") {
if let Some(pt) = get_protocol_type(m) {
node_ref.merge_filter(NodeRefFilter::new().with_protocol_type(pt));
} else if let Some(at) = get_address_type(m) {
node_ref.merge_filter(NodeRefFilter::new().with_address_type(at));
} else if let Some(rd) = get_routing_domain(m) {
node_ref.merge_filter(NodeRefFilter::new().with_routing_domain(rd));
} else {
return None;
}
}
Some(node_ref)
}
}
fn get_destination(routing_table: RoutingTable) -> impl FnOnce(&str) -> Option<Destination> {
move |text| {
// Safety selection
let (text, ss) = if let Some((first, second)) = text.split_once('+') {
let ss = get_safety_selection(second, routing_table.route_spec_store())?;
(first, Some(ss))
} else {
(text, None)
};
if text.len() == 0 {
return None;
}
if &text[0..1] == "#" {
// Private route
let text = &text[1..];
let n = get_number(text)?;
let dc = DEBUG_CACHE.lock();
let r = dc.imported_routes.get(n)?;
Some(Destination::private_route(
r.clone(),
ss.unwrap_or(SafetySelection::Unsafe(Sequencing::NoPreference)),
))
} else {
let (text, mods) = text
.split_once('/')
.map(|x| (x.0, Some(x.1)))
.unwrap_or((text, None));
if let Some((first, second)) = text.split_once('@') {
// Relay
let relay_id = get_dht_key(second)?;
let mut relay_nr = routing_table.lookup_node_ref(relay_id)?;
let target_id = get_dht_key(first)?;
if let Some(mods) = mods {
relay_nr = get_node_ref_modifiers(relay_nr)(mods)?;
}
let mut d = Destination::relay(relay_nr, target_id);
if let Some(ss) = ss {
d = d.with_safety(ss)
}
Some(d)
} else {
// Direct
let target_id = get_dht_key(text)?;
let mut target_nr = routing_table.lookup_node_ref(target_id)?;
if let Some(mods) = mods {
target_nr = get_node_ref_modifiers(target_nr)(mods)?;
}
let mut d = Destination::direct(target_nr);
if let Some(ss) = ss {
d = d.with_safety(ss)
}
Some(d)
}
}
}
}
fn get_number(text: &str) -> Option<usize> { fn get_number(text: &str) -> Option<usize> {
usize::from_str(text).ok() usize::from_str(text).ok()
} }
@ -51,6 +181,22 @@ fn get_dht_key(text: &str) -> Option<DHTKey> {
DHTKey::try_decode(text).ok() DHTKey::try_decode(text).ok()
} }
fn get_node_ref(routing_table: RoutingTable) -> impl FnOnce(&str) -> Option<NodeRef> {
move |text| {
let (text, mods) = text
.split_once('/')
.map(|x| (x.0, Some(x.1)))
.unwrap_or((text, None));
let node_id = get_dht_key(text)?;
let mut nr = routing_table.lookup_node_ref(node_id)?;
if let Some(mods) = mods {
nr = get_node_ref_modifiers(nr)(mods)?;
}
Some(nr)
}
}
fn get_protocol_type(text: &str) -> Option<ProtocolType> { fn get_protocol_type(text: &str) -> Option<ProtocolType> {
let lctext = text.to_ascii_lowercase(); let lctext = text.to_ascii_lowercase();
if lctext == "udp" { if lctext == "udp" {
@ -366,55 +512,19 @@ impl VeilidAPI {
async fn debug_contact(&self, args: String) -> Result<String, VeilidAPIError> { async fn debug_contact(&self, args: String) -> Result<String, VeilidAPIError> {
let args: Vec<String> = args.split_whitespace().map(|s| s.to_owned()).collect(); let args: Vec<String> = args.split_whitespace().map(|s| s.to_owned()).collect();
let node_id = get_debug_argument_at(&args, 0, "debug_contact", "node_id", get_dht_key)?;
let network_manager = self.network_manager()?; let network_manager = self.network_manager()?;
let routing_table = network_manager.routing_table(); let routing_table = network_manager.routing_table();
let mut nr = match routing_table.lookup_node_ref(node_id) { let node_ref = get_debug_argument_at(
Some(nr) => nr,
None => return Ok("Node id not found in routing table".to_owned()),
};
let mut ai = 1;
let mut routing_domain = None;
while ai < args.len() {
if let Ok(pt) = get_debug_argument_at(
&args, &args,
ai, 0,
"debug_contact", "debug_contact",
"protocol_type", "node_ref",
get_protocol_type, get_node_ref(routing_table),
) { )?;
nr.merge_filter(NodeRefFilter::new().with_protocol_type(pt));
} else if let Ok(at) =
get_debug_argument_at(&args, ai, "debug_contact", "address_type", get_address_type)
{
nr.merge_filter(NodeRefFilter::new().with_address_type(at));
} else if let Ok(rd) = get_debug_argument_at(
&args,
ai,
"debug_contact",
"routing_domain",
get_routing_domain,
) {
if routing_domain.is_none() {
routing_domain = Some(rd);
} else {
return Ok("Multiple routing domains specified".to_owned());
}
} else {
return Ok(format!("Invalid argument specified: {}", args[ai]));
}
ai += 1;
}
if let Some(routing_domain) = routing_domain {
nr.merge_filter(NodeRefFilter::new().with_routing_domain(routing_domain))
}
let cm = network_manager let cm = network_manager
.get_node_contact_method(nr) .get_node_contact_method(node_ref)
.map_err(VeilidAPIError::internal)?; .map_err(VeilidAPIError::internal)?;
Ok(format!("{:#?}", cm)) Ok(format!("{:#?}", cm))
@ -427,49 +537,17 @@ impl VeilidAPI {
let args: Vec<String> = args.split_whitespace().map(|s| s.to_owned()).collect(); let args: Vec<String> = args.split_whitespace().map(|s| s.to_owned()).collect();
let node_id = get_debug_argument_at(&args, 0, "debug_ping", "node_id", get_dht_key)?; let dest = get_debug_argument_at(
let mut nr = match routing_table.lookup_node_ref(node_id) {
Some(nr) => nr,
None => return Ok("Node id not found in routing table".to_owned()),
};
let mut ai = 1;
let mut routing_domain = None;
while ai < args.len() {
if let Ok(pt) =
get_debug_argument_at(&args, ai, "debug_ping", "protocol_type", get_protocol_type)
{
nr.merge_filter(NodeRefFilter::new().with_protocol_type(pt));
} else if let Ok(at) =
get_debug_argument_at(&args, ai, "debug_ping", "address_type", get_address_type)
{
nr.merge_filter(NodeRefFilter::new().with_address_type(at));
} else if let Ok(rd) = get_debug_argument_at(
&args, &args,
ai, 0,
"debug_ping", "debug_ping",
"routing_domain", "destination",
get_routing_domain, get_destination(routing_table),
) { )?;
if routing_domain.is_none() {
routing_domain = Some(rd);
} else {
return Ok("Multiple routing domains specified".to_owned());
}
} else {
return Ok(format!("Invalid argument specified: {}", args[ai]));
}
ai += 1;
}
if let Some(routing_domain) = routing_domain {
nr.merge_filter(NodeRefFilter::new().with_routing_domain(routing_domain))
}
// Dump routing table entry // Dump routing table entry
let out = match rpc let out = match rpc
.rpc_call_status(Destination::direct(nr)) .rpc_call_status(dest)
.await .await
.map_err(VeilidAPIError::internal)? .map_err(VeilidAPIError::internal)?
{ {
@ -595,7 +673,14 @@ impl VeilidAPI {
// Convert to blob // Convert to blob
let blob_data = RouteSpecStore::private_route_to_blob(&private_route) let blob_data = RouteSpecStore::private_route_to_blob(&private_route)
.map_err(VeilidAPIError::internal)?; .map_err(VeilidAPIError::internal)?;
data_encoding::BASE64URL_NOPAD.encode(&blob_data) let out = BASE64URL_NOPAD.encode(&blob_data);
info!(
"Published route {} as {} bytes:\n{}",
route_id.encode(),
blob_data.len(),
out
);
format!("Published route {}", route_id.encode())
} }
Err(e) => { Err(e) => {
format!("Couldn't assemble private route: {}", e) format!("Couldn't assemble private route: {}", e)
@ -646,9 +731,21 @@ impl VeilidAPI {
} }
Ok(out) Ok(out)
} }
async fn debug_route_import(&self, _args: Vec<String>) -> Result<String, VeilidAPIError> { async fn debug_route_import(&self, args: Vec<String>) -> Result<String, VeilidAPIError> {
// <blob> // <blob>
let out = format!("");
let blob = get_debug_argument_at(&args, 1, "debug_route", "blob", get_string)?;
let blob_dec = BASE64URL_NOPAD
.decode(blob.as_bytes())
.map_err(VeilidAPIError::generic)?;
let pr =
RouteSpecStore::blob_to_private_route(blob_dec).map_err(VeilidAPIError::generic)?;
let mut dc = DEBUG_CACHE.lock();
let n = dc.imported_routes.len();
let out = format!("Private route #{} imported: {}", n, pr.public_key);
dc.imported_routes.push(pr);
return Ok(out); return Ok(out);
} }
@ -682,22 +779,34 @@ impl VeilidAPI {
buckets [dead|reliable] buckets [dead|reliable]
dialinfo dialinfo
entries [dead|reliable] [limit] entries [dead|reliable] [limit]
entry <node_id> entry <node>
nodeinfo nodeinfo
config [key [new value]] config [key [new value]]
purge <buckets|connections> purge <buckets|connections>
attach attach
detach detach
restart network restart network
ping <node_id> [protocol_type][address_type][routing_domain] ping <destination>
contact <node_id> [protocol_type [address_type]] contact <node>[<modifiers>]
route allocate [ord|*ord] [rel] [<count>] [in|out] route allocate [ord|*ord] [rel] [<count>] [in|out]
route release <route id> release <route>
route publish <route id> [full] publish <route> [full]
route unpublish <route id> unpublish <route>
route print <route id> print <route>
route list list
route import <blob> import <blob>
<destination> is:
* direct: <node>[+<safety>][<modifiers>]
* relay: <relay>@<target>[+<safety>][<modifiers>]
* private: #<id>[+<safety>]
<safety> is:
* unsafe: -[ord|*ord]
* safe: [route][,ord|*ord][,rel][,<count>]
<modifiers> is: [/<protocoltype>][/<addresstype>][/<routingdomain>]
<protocoltype> is: udp|tcp|ws|wss
<addresstype> is: ipv4|ipv6
<routingdomain> is: public|local
"# "#
.to_owned()) .to_owned())
} }