checkpoint
This commit is contained in:
@@ -21,6 +21,7 @@ mod rpc_validate_dial_info;
|
||||
mod rpc_value_changed;
|
||||
mod rpc_watch_value;
|
||||
|
||||
pub use coders::*;
|
||||
pub use destination::*;
|
||||
pub use operation_waiter::*;
|
||||
pub use rpc_error::*;
|
||||
@@ -29,7 +30,6 @@ use super::*;
|
||||
use crate::dht::*;
|
||||
use crate::xx::*;
|
||||
use capnp::message::ReaderSegments;
|
||||
use coders::*;
|
||||
use futures_util::StreamExt;
|
||||
use network_manager::*;
|
||||
use receipt_manager::*;
|
||||
@@ -55,6 +55,8 @@ struct RPCMessageHeader {
|
||||
connection_descriptor: ConnectionDescriptor,
|
||||
/// The routing domain the message was sent through
|
||||
routing_domain: RoutingDomain,
|
||||
/// The private route the message was received through
|
||||
private_route: Option<DHTKey>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -134,7 +136,7 @@ struct RenderedOperation {
|
||||
message: Vec<u8>, // The rendered operation bytes
|
||||
node_id: DHTKey, // Destination node id we're sending to
|
||||
node_ref: NodeRef, // Node to send envelope to (may not be destination node id in case of relay)
|
||||
hop_count: usize, // Total safety + private route hop count
|
||||
hop_count: usize, // Total safety + private route hop count + 1 hop for the initial send
|
||||
}
|
||||
/////////////////////////////////////////////////////////////////////
|
||||
|
||||
@@ -396,16 +398,25 @@ impl RPCProcessor {
|
||||
}
|
||||
|
||||
// Wrap an operation with a private route inside a safety route
|
||||
pub(super) fn wrap_with_route(xxx continue here
|
||||
pub(super) fn wrap_with_route(
|
||||
&self,
|
||||
safety_spec: SafetySpec,
|
||||
safety_spec: Option<SafetySpec>,
|
||||
private_route: PrivateRoute,
|
||||
message_data: Vec<u8>,
|
||||
) -> Result<RenderedOperation, RPCError> {
|
||||
let compiled_route: CompiledRoute = self.routing_table().with_route_spec_store(|rss| {
|
||||
// Compile the safety route with the private route
|
||||
rss.compile_safety_route(self.safety_spec, private_route)
|
||||
})?;
|
||||
) -> Result<NetworkResult<RenderedOperation>, RPCError> {
|
||||
let routing_table = self.routing_table();
|
||||
let compiled_route: CompiledRoute =
|
||||
match self.routing_table().with_route_spec_store(|rss, rti| {
|
||||
// Compile the safety route with the private route
|
||||
rss.compile_safety_route(rti, routing_table, safety_spec, private_route)
|
||||
})? {
|
||||
Some(cr) => cr,
|
||||
None => {
|
||||
return Ok(NetworkResult::no_connection_other(
|
||||
"private route could not be compiled at this time",
|
||||
))
|
||||
}
|
||||
};
|
||||
|
||||
// Encrypt routed operation
|
||||
// Xmsg + ENC(Xmsg, DH(PKapr, SKbsr))
|
||||
@@ -421,30 +432,42 @@ impl RPCProcessor {
|
||||
let operation = RoutedOperation::new(nonce, enc_msg_data);
|
||||
|
||||
// Prepare route operation
|
||||
let route = RPCOperationRoute {
|
||||
safety_route,
|
||||
let route_operation = RPCOperationRoute {
|
||||
safety_route: compiled_route.safety_route,
|
||||
operation,
|
||||
};
|
||||
let operation =
|
||||
RPCOperation::new_statement(RPCStatement::new(RPCStatementDetail::Route(route)), None);
|
||||
let operation = RPCOperation::new_statement(
|
||||
RPCStatement::new(RPCStatementDetail::Route(route_operation)),
|
||||
None,
|
||||
);
|
||||
|
||||
// Convert message to bytes and return it
|
||||
let mut route_msg = ::capnp::message::Builder::new_default();
|
||||
let mut route_operation = route_msg.init_root::<veilid_capnp::operation::Builder>();
|
||||
operation.encode(&mut route_operation)?;
|
||||
let out = builder_to_vec(route_msg)?;
|
||||
let out_message = builder_to_vec(route_msg)?;
|
||||
|
||||
// out_node_id = sr
|
||||
// .hops
|
||||
// .first()
|
||||
// .ok_or_else(RPCError::else_internal("no hop in safety route"))?
|
||||
// .dial_info
|
||||
// .node_id
|
||||
// .key;
|
||||
// Get the first hop this is going to
|
||||
let out_node_id = compiled_route
|
||||
.safety_route
|
||||
.hops
|
||||
.first()
|
||||
.ok_or_else(RPCError::else_internal("no hop in safety route"))?
|
||||
.dial_info
|
||||
.node_id
|
||||
.key;
|
||||
|
||||
//out_hop_count = 1 + sr.hops.len();
|
||||
let out_hop_count =
|
||||
(1 + compiled_route.safety_route.hop_count + private_route.hop_count) as usize;
|
||||
|
||||
Ok(out)
|
||||
let out = RenderedOperation {
|
||||
message: out_message,
|
||||
node_id: out_node_id,
|
||||
node_ref: compiled_route.first_hop,
|
||||
hop_count: out_hop_count,
|
||||
};
|
||||
|
||||
Ok(NetworkResult::value(out))
|
||||
}
|
||||
|
||||
/// Produce a byte buffer that represents the wire encoding of the entire
|
||||
@@ -455,8 +478,8 @@ impl RPCProcessor {
|
||||
&self,
|
||||
dest: Destination,
|
||||
operation: &RPCOperation,
|
||||
) -> Result<RenderedOperation, RPCError> {
|
||||
let out: RenderedOperation;
|
||||
) -> Result<NetworkResult<RenderedOperation>, RPCError> {
|
||||
let out: NetworkResult<RenderedOperation>;
|
||||
|
||||
// Encode message to a builder and make a message reader for it
|
||||
// Then produce the message as an unencrypted byte buffer
|
||||
@@ -471,12 +494,12 @@ impl RPCProcessor {
|
||||
match dest {
|
||||
Destination::Direct {
|
||||
target: ref node_ref,
|
||||
safety,
|
||||
safety_spec,
|
||||
}
|
||||
| Destination::Relay {
|
||||
relay: ref node_ref,
|
||||
target: _,
|
||||
safety,
|
||||
safety_spec,
|
||||
} => {
|
||||
// Send to a node without a private route
|
||||
// --------------------------------------
|
||||
@@ -485,7 +508,7 @@ impl RPCProcessor {
|
||||
let (node_ref, node_id) = if let Destination::Relay {
|
||||
relay: _,
|
||||
target: ref dht_key,
|
||||
safety: _,
|
||||
safety_spec: _,
|
||||
} = dest
|
||||
{
|
||||
(node_ref.clone(), dht_key.clone())
|
||||
@@ -495,36 +518,36 @@ impl RPCProcessor {
|
||||
};
|
||||
|
||||
// Handle the existence of safety route
|
||||
match safety {
|
||||
false => {
|
||||
match safety_spec {
|
||||
None => {
|
||||
// If no safety route is being used, and we're not sending to a private
|
||||
// route, we can use a direct envelope instead of routing
|
||||
out = RenderedOperation {
|
||||
out = NetworkResult::value(RenderedOperation {
|
||||
message,
|
||||
node_id,
|
||||
node_ref,
|
||||
hop_count: 1,
|
||||
};
|
||||
});
|
||||
}
|
||||
true => {
|
||||
Some(safety_spec) => {
|
||||
// No private route was specified for the request
|
||||
// but we are using a safety route, so we must create an empty private route
|
||||
let private_route = PrivateRoute::new_stub(node_id);
|
||||
|
||||
// Wrap with safety route
|
||||
out = self.wrap_with_route(true, private_route, message)?;
|
||||
out = self.wrap_with_route(Some(safety_spec), private_route, message)?;
|
||||
}
|
||||
};
|
||||
}
|
||||
Destination::PrivateRoute {
|
||||
private_route,
|
||||
safety,
|
||||
safety_spec,
|
||||
reliable,
|
||||
} => {
|
||||
// Send to private route
|
||||
// ---------------------
|
||||
// Reply with 'route' operation
|
||||
out = self.wrap_with_route(safety, private_route, message)?;
|
||||
out = self.wrap_with_route(safety_spec, private_route, message)?;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -602,7 +625,7 @@ impl RPCProcessor {
|
||||
node_id,
|
||||
node_ref,
|
||||
hop_count,
|
||||
} = self.render_operation(dest, &operation)?;
|
||||
} = network_result_try!(self.render_operation(dest, &operation)?);
|
||||
|
||||
// Calculate answer timeout
|
||||
// Timeout is number of hops times the timeout per hop
|
||||
@@ -665,7 +688,7 @@ impl RPCProcessor {
|
||||
node_id,
|
||||
node_ref,
|
||||
hop_count: _,
|
||||
} = self.render_operation(dest, &operation)?;
|
||||
} = network_result_try!(self.render_operation(dest, &operation)?);
|
||||
|
||||
// Send statement
|
||||
let bytes = message.len() as u64;
|
||||
@@ -749,7 +772,7 @@ impl RPCProcessor {
|
||||
node_id,
|
||||
node_ref,
|
||||
hop_count: _,
|
||||
} = self.render_operation(dest, &operation)?;
|
||||
} = network_result_try!(self.render_operation(dest, &operation)?);
|
||||
|
||||
// Send the reply
|
||||
let bytes = message.len() as u64;
|
||||
|
Reference in New Issue
Block a user