more capability work

This commit is contained in:
John Smith
2023-07-04 00:24:55 -04:00
parent e674eaf496
commit 8f721c428b
33 changed files with 456 additions and 474 deletions
@@ -31,6 +31,18 @@ pub fn encode_node_info(
s.clone_from_slice(&csvec);
}
let mut cap_builder = builder
.reborrow()
.init_capabilities(node_info.capabilities().len() as u32);
if let Some(s) = cap_builder.as_slice() {
let capvec: Vec<u32> = node_info
.capabilities()
.iter()
.map(|x| u32::from_be_bytes(x.0))
.collect();
s.clone_from_slice(&capvec);
}
let mut didl_builder = builder.reborrow().init_dial_info_detail_list(
node_info
.dial_info_detail_list()
@@ -121,6 +133,18 @@ pub fn decode_node_info(reader: &veilid_capnp::node_info::Reader) -> Result<Node
return Err(RPCError::protocol("no crypto kinds"));
}
let cap_reader = reader
.reborrow()
.get_capabilities()
.map_err(RPCError::protocol)?;
if cap_reader.len() as usize > MAX_CAPABILITIES {
return Err(RPCError::protocol("too many capabilities"));
}
let capabilities = cap_reader
.as_slice()
.map(|s| s.iter().map(|x| FourCC::from(x.to_be_bytes())).collect())
.unwrap_or_default();
let didl_reader = reader
.reborrow()
.get_dial_info_detail_list()
@@ -141,6 +165,7 @@ pub fn decode_node_info(reader: &veilid_capnp::node_info::Reader) -> Result<Node
address_types,
envelope_support,
crypto_support,
capabilities,
dial_info_detail_list,
))
}
@@ -1,113 +1,14 @@
use super::*;
pub fn encode_public_internet_node_status(
public_internet_node_status: &PublicInternetNodeStatus,
builder: &mut veilid_capnp::public_internet_node_status::Builder,
) -> Result<(), RPCError> {
let mut cap_builder = builder
.reborrow()
.init_capabilities(public_internet_node_status.capabilities.len() as u32);
if let Some(s) = cap_builder.as_slice() {
let capvec: Vec<u32> = public_internet_node_status
.capabilities
.iter()
.map(|x| u32::from_be_bytes(x.0))
.collect();
s.clone_from_slice(&capvec);
}
Ok(())
}
pub fn decode_public_internet_node_status(
reader: &veilid_capnp::public_internet_node_status::Reader,
) -> Result<PublicInternetNodeStatus, RPCError> {
let cap_reader = reader
.reborrow()
.get_capabilities()
.map_err(RPCError::protocol)?;
if cap_reader.len() as usize > MAX_CAPABILITIES {
return Err(RPCError::protocol("too many capabilities"));
}
let capabilities = cap_reader
.as_slice()
.map(|s| s.iter().map(|x| FourCC::from(x.to_be_bytes())).collect())
.unwrap_or_default();
Ok(PublicInternetNodeStatus { capabilities })
}
pub fn encode_local_network_node_status(
local_network_node_status: &LocalNetworkNodeStatus,
builder: &mut veilid_capnp::local_network_node_status::Builder,
) -> Result<(), RPCError> {
let mut cap_builder = builder
.reborrow()
.init_capabilities(local_network_node_status.capabilities.len() as u32);
if let Some(s) = cap_builder.as_slice() {
let capvec: Vec<u32> = local_network_node_status
.capabilities
.iter()
.map(|x| u32::from_be_bytes(x.0))
.collect();
s.clone_from_slice(&capvec);
}
Ok(())
}
pub fn decode_local_network_node_status(
reader: &veilid_capnp::local_network_node_status::Reader,
) -> Result<LocalNetworkNodeStatus, RPCError> {
let cap_reader = reader
.reborrow()
.get_capabilities()
.map_err(RPCError::protocol)?;
if cap_reader.len() as usize > MAX_CAPABILITIES {
return Err(RPCError::protocol("too many capabilities"));
}
let capabilities = cap_reader
.as_slice()
.map(|s| s.iter().map(|x| FourCC::from(x.to_be_bytes())).collect())
.unwrap_or_default();
Ok(LocalNetworkNodeStatus { capabilities })
}
pub fn encode_node_status(
node_status: &NodeStatus,
builder: &mut veilid_capnp::node_status::Builder,
_node_status: &NodeStatus,
_builder: &mut veilid_capnp::node_status::Builder,
) -> Result<(), RPCError> {
match node_status {
NodeStatus::PublicInternet(ns) => {
let mut pi_builder = builder.reborrow().init_public_internet();
encode_public_internet_node_status(&ns, &mut pi_builder)
}
NodeStatus::LocalNetwork(ns) => {
let mut ln_builder = builder.reborrow().init_local_network();
encode_local_network_node_status(&ns, &mut ln_builder)
}
}
Ok(())
}
pub fn decode_node_status(
reader: &veilid_capnp::node_status::Reader,
_reader: &veilid_capnp::node_status::Reader,
) -> Result<NodeStatus, RPCError> {
Ok(
match reader
.which()
.map_err(RPCError::map_internal("invalid node status"))?
{
veilid_capnp::node_status::PublicInternet(pi) => {
let r = pi.map_err(RPCError::protocol)?;
let pins = decode_public_internet_node_status(&r)?;
NodeStatus::PublicInternet(pins)
}
veilid_capnp::node_status::LocalNetwork(ln) => {
let r = ln.map_err(RPCError::protocol)?;
let lnns = decode_local_network_node_status(&r)?;
NodeStatus::LocalNetwork(lnns)
}
},
)
Ok(NodeStatus {})
}
+7
View File
@@ -117,6 +117,13 @@ impl RPCMessageHeader {
// RPCMessageHeaderDetail::PrivateRouted(p) => p.direct.peer_noderef.clone(),
// }
// }
pub fn routing_domain(&self) -> RoutingDomain {
match &self.detail {
RPCMessageHeaderDetail::Direct(d) => d.routing_domain,
RPCMessageHeaderDetail::SafetyRouted(s) => s.direct.routing_domain,
RPCMessageHeaderDetail::PrivateRouted(p) => p.direct.routing_domain,
}
}
pub fn direct_sender_node_id(&self) -> TypedKey {
match &self.detail {
RPCMessageHeaderDetail::Direct(d) => {
@@ -54,10 +54,14 @@ impl RPCProcessor {
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
// Ignore if disabled
let routing_table = self.routing_table();
{
let c = self.config.get();
if c.capabilities.disable.contains(&CAP_WILL_APPMESSAGE) {
return Ok(NetworkResult::service_unavailable("appcall is disabled"));
if let Some(opi) = routing_table.get_own_peer_info(msg.header.routing_domain()) {
if !opi.signed_node_info().node_info().can_appmessage() {
return Ok(NetworkResult::service_unavailable(
"app call is not available",
));
}
}
}
@@ -25,10 +25,14 @@ impl RPCProcessor {
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
// Ignore if disabled
let routing_table = self.routing_table();
{
let c = self.config.get();
if c.capabilities.disable.contains(&CAP_WILL_APPMESSAGE) {
return Ok(NetworkResult::service_unavailable("appmessage is disabled"));
if let Some(opi) = routing_table.get_own_peer_info(msg.header.routing_domain()) {
if !opi.signed_node_info().node_info().can_appmessage() {
return Ok(NetworkResult::service_unavailable(
"app message is not available",
));
}
}
}
@@ -7,12 +7,17 @@ impl RPCProcessor {
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
// Ignore if disabled
#[cfg(feature = "unstable-tunnels")]
{
let c = self.config.get();
if c.capabilities.disable.contains(&CAP_WILL_TUNNEL) {
return Ok(NetworkResult::service_unavailable(
"cancel tunnel is disabled",
));
let routing_table = self.routing_table();
{
if let Some(opi) = routing_table.get_own_peer_info(msg.header.routing_domain()) {
if !opi.signed_node_info().node_info().can_tunnel() {
return Ok(NetworkResult::service_unavailable(
"tunnel is not available",
));
}
}
}
}
@@ -7,12 +7,17 @@ impl RPCProcessor {
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
// Ignore if disabled
#[cfg(feature = "unstable-tunnels")]
{
let c = self.config.get();
if c.capabilities.disable.contains(&CAP_WILL_TUNNEL) {
return Ok(NetworkResult::service_unavailable(
"complete tunnel is disabled",
));
let routing_table = self.routing_table();
{
if let Some(opi) = routing_table.get_own_peer_info(msg.header.routing_domain()) {
if !opi.signed_node_info().node_info().can_tunnel() {
return Ok(NetworkResult::service_unavailable(
"tunnel is not available",
));
}
}
}
}
Err(RPCError::unimplemented("process_complete_tunnel_q"))
@@ -7,10 +7,17 @@ impl RPCProcessor {
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
// Ignore if disabled
#[cfg(feature = "unstable-blockstore")]
{
let c = self.config.get();
if c.capabilities.disable.contains(&CAP_WILL_BLOCKSTORE) {
return Ok(NetworkResult::service_unavailable("find block is disabled"));
let routing_table = self.routing_table();
{
if let Some(opi) = routing_table.get_own_peer_info(detail.routing_domain) {
if !opi.signed_node_info().node_info().can_blockstore() {
return Ok(NetworkResult::service_unavailable(
"block store is not available",
));
}
}
}
}
Err(RPCError::unimplemented("process_find_block_q"))
+12 -8
View File
@@ -163,13 +163,7 @@ impl RPCProcessor {
&self,
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
// Ignore if disabled
{
let c = self.config.get();
if c.capabilities.disable.contains(&CAP_WILL_DHT) {
return Ok(NetworkResult::service_unavailable("get value is disabled"));
}
}
// Ensure this never came over a private route, safety route is okay though
match &msg.header.detail {
RPCMessageHeaderDetail::Direct(_) | RPCMessageHeaderDetail::SafetyRouted(_) => {}
@@ -179,7 +173,17 @@ impl RPCProcessor {
))
}
}
// Ignore if disabled
let routing_table = self.routing_table();
{
if let Some(opi) = routing_table.get_own_peer_info(msg.header.routing_domain()) {
if !opi.signed_node_info().node_info().can_dht() {
return Ok(NetworkResult::service_unavailable(
"dht is not available",
));
}
}
}
// Get the question
let kind = msg.operation.kind().clone();
let get_value_q = match kind {
+7 -8
View File
@@ -366,15 +366,14 @@ impl RPCProcessor {
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
// Ignore if disabled
let routing_table = self.routing_table();
{
let c = self.config.get();
if c.capabilities
.disable
.contains(&CAP_WILL_ROUTE)
{
return Ok(NetworkResult::service_unavailable(
"route is disabled",
));
if let Some(opi) = routing_table.get_own_peer_info(msg.header.routing_domain()) {
if !opi.signed_node_info().node_info().can_route() {
return Ok(NetworkResult::service_unavailable(
"route is not available",
));
}
}
}
@@ -176,10 +176,14 @@ impl RPCProcessor {
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
// Ignore if disabled
let routing_table = self.routing_table();
{
let c = self.config.get();
if c.capabilities.disable.contains(&CAP_WILL_DHT) {
return Ok(NetworkResult::service_unavailable("set value is disabled"));
if let Some(opi) = routing_table.get_own_peer_info(msg.header.routing_domain()) {
if !opi.signed_node_info().node_info().can_dht() {
return Ok(NetworkResult::service_unavailable(
"dht is not available",
));
}
}
}
// Ensure this never came over a private route, safety route is okay though
+7 -3
View File
@@ -38,10 +38,14 @@ impl RPCProcessor {
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
// Ignore if disabled
let routing_table = self.routing_table();
{
let c = self.config.get();
if c.capabilities.disable.contains(&CAP_WILL_SIGNAL) {
return Ok(NetworkResult::service_unavailable("signal is disabled"));
if let Some(opi) = routing_table.get_own_peer_info(msg.header.routing_domain()) {
if !opi.signed_node_info().node_info().can_signal() {
return Ok(NetworkResult::service_unavailable(
"signal is not available",
));
}
}
}
@@ -7,12 +7,17 @@ impl RPCProcessor {
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
// Ignore if disabled
#[cfg(feature = "unstable-tunnels")]
{
let c = self.config.get();
if c.capabilities.disable.contains(&CAP_WILL_TUNNEL) {
return Ok(NetworkResult::service_unavailable(
"start tunnel is disabled",
));
let routing_table = self.routing_table();
{
if let Some(opi) = routing_table.get_own_peer_info(msg.header.routing_domain()) {
if !opi.signed_node_info().node_info().can_tunnel() {
return Ok(NetworkResult::service_unavailable(
"tunnel is not available",
));
}
}
}
}
Err(RPCError::unimplemented("process_start_tunnel_q"))
+2 -36
View File
@@ -133,25 +133,8 @@ impl RPCProcessor {
// Ensure the returned node status is the kind for the routing domain we asked for
if let Some(target_nr) = opt_target_nr {
if let Some(a_node_status) = a_node_status {
match routing_domain {
RoutingDomain::PublicInternet => {
if !matches!(a_node_status, NodeStatus::PublicInternet(_)) {
return Ok(NetworkResult::invalid_message(
"node status doesn't match PublicInternet routing domain",
));
}
}
RoutingDomain::LocalNetwork => {
if !matches!(a_node_status, NodeStatus::LocalNetwork(_)) {
return Ok(NetworkResult::invalid_message(
"node status doesn't match LocalNetwork routing domain",
));
}
}
}
// Update latest node status in routing table
target_nr.update_node_status(a_node_status.clone());
target_nr.update_node_status(routing_domain, a_node_status.clone());
}
}
@@ -236,27 +219,10 @@ impl RPCProcessor {
// Ensure the node status from the question is the kind for the routing domain we received the request in
if let Some(q_node_status) = q_node_status {
match routing_domain {
RoutingDomain::PublicInternet => {
if !matches!(q_node_status, NodeStatus::PublicInternet(_)) {
return Ok(NetworkResult::invalid_message(
"node status doesn't match PublicInternet routing domain",
));
}
}
RoutingDomain::LocalNetwork => {
if !matches!(q_node_status, NodeStatus::LocalNetwork(_)) {
return Ok(NetworkResult::invalid_message(
"node status doesn't match LocalNetwork routing domain",
));
}
}
}
// update node status for the requesting node to our routing table
if let Some(sender_nr) = msg.opt_sender_nr.clone() {
// Update latest node status in routing table for the statusq sender
sender_nr.update_node_status(q_node_status.clone());
sender_nr.update_node_status(routing_domain, q_node_status.clone());
}
}
@@ -7,12 +7,17 @@ impl RPCProcessor {
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
// Ignore if disabled
#[cfg(feature = "unstable-blockstore")]
{
let c = self.config.get();
if c.capabilities.disable.contains(&CAP_WILL_BLOCKSTORE) {
return Ok(NetworkResult::service_unavailable(
"supply block is disabled",
));
let routing_table = self.routing_table();
{
if let Some(opi) = routing_table.get_own_peer_info(detail.routing_domain) {
if !opi.signed_node_info().node_info().can_blockstore() {
return Ok(NetworkResult::service_unavailable(
"block store is not available",
));
}
}
}
}
Err(RPCError::unimplemented("process_supply_block_q"))
@@ -58,19 +58,6 @@ impl RPCProcessor {
&self,
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
// Ignore if disabled
{
let c = self.config.get();
if c.capabilities
.disable
.contains(&CAP_WILL_VALIDATE_DIAL_INFO)
{
return Ok(NetworkResult::service_unavailable(
"validate dial info is disabled",
));
}
}
let detail = match msg.header.detail {
RPCMessageHeaderDetail::Direct(detail) => detail,
RPCMessageHeaderDetail::SafetyRouted(_) | RPCMessageHeaderDetail::PrivateRouted(_) => {
@@ -80,6 +67,18 @@ impl RPCProcessor {
}
};
// Ignore if disabled
let routing_table = self.routing_table();
{
if let Some(opi) = routing_table.get_own_peer_info(detail.routing_domain) {
if !opi.signed_node_info().node_info().can_validate_dial_info() {
return Ok(NetworkResult::service_unavailable(
"validate dial info is not available",
));
}
}
}
// Get the statement
let (_, _, _, kind) = msg.operation.destructure();
let (dial_info, receipt, redirect) = match kind {
@@ -96,7 +95,6 @@ impl RPCProcessor {
// We filter on the -outgoing- protocol capability status not the node's dial info
// Use the address type though, to ensure we reach an ipv6 capable node if this is
// an ipv6 address
let routing_table = self.routing_table();
let sender_node_id = TypedKey::new(
detail.envelope.get_crypto_kind(),
detail.envelope.get_sender_id(),
@@ -117,11 +115,9 @@ impl RPCProcessor {
move |rti: &RoutingTableInner, v: Option<Arc<BucketEntry>>| {
let entry = v.unwrap();
entry.with(rti, move |_rti, e| {
if let Some(status) = &e.node_status(routing_domain) {
status.has_capability(CAP_WILL_VALIDATE_DIAL_INFO)
} else {
true
}
e.node_info(routing_domain)
.map(|ni| ni.can_validate_dial_info())
.unwrap_or(false)
})
},
) as RoutingTableEntryFilter;
@@ -7,12 +7,12 @@ impl RPCProcessor {
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
// Ignore if disabled
let routing_table = self.routing_table();
{
let c = self.config.get();
if c.capabilities.disable.contains(&CAP_WILL_DHT) {
return Ok(NetworkResult::service_unavailable(
"value changed is disabled",
));
if let Some(opi) = routing_table.get_own_peer_info(msg.header.routing_domain()) {
if !opi.signed_node_info().node_info().can_dht() {
return Ok(NetworkResult::service_unavailable("dht is not available"));
}
}
}
Err(RPCError::unimplemented("process_value_changed"))
@@ -7,15 +7,14 @@ impl RPCProcessor {
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
// Ignore if disabled
let routing_table = self.routing_table();
{
let c = self.config.get();
if c.capabilities.disable.contains(&CAP_WILL_DHT) {
return Ok(NetworkResult::service_unavailable(
"watch value is disabled",
));
if let Some(opi) = routing_table.get_own_peer_info(msg.header.routing_domain()) {
if !opi.signed_node_info().node_info().can_dht() {
return Ok(NetworkResult::service_unavailable("dht is not available"));
}
}
}
Err(RPCError::unimplemented("process_watch_value_q"))
}
}