removing dev branch, many changes
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
mod coders;
|
||||
mod destination;
|
||||
mod fanout_call;
|
||||
mod operation_waiter;
|
||||
mod rpc_app_call;
|
||||
mod rpc_app_message;
|
||||
@@ -22,18 +23,20 @@ mod rpc_watch_value;
|
||||
|
||||
pub use coders::*;
|
||||
pub use destination::*;
|
||||
pub use fanout_call::*;
|
||||
pub use operation_waiter::*;
|
||||
pub use rpc_error::*;
|
||||
pub use rpc_status::*;
|
||||
|
||||
use super::*;
|
||||
|
||||
use crate::crypto::*;
|
||||
use crypto::*;
|
||||
use futures_util::StreamExt;
|
||||
use network_manager::*;
|
||||
use receipt_manager::*;
|
||||
use routing_table::*;
|
||||
use stop_token::future::FutureExt;
|
||||
use storage_manager::*;
|
||||
|
||||
/////////////////////////////////////////////////////////////////////
|
||||
|
||||
@@ -149,7 +152,7 @@ where
|
||||
|
||||
#[derive(Debug)]
|
||||
struct WaitableReply {
|
||||
handle: OperationWaitHandle<RPCMessage>,
|
||||
handle: OperationWaitHandle<RPCMessage, Option<QuestionContext>>,
|
||||
timeout_us: TimestampDuration,
|
||||
node_ref: NodeRef,
|
||||
send_ts: Timestamp,
|
||||
@@ -235,8 +238,8 @@ pub struct RPCProcessorUnlockedInner {
|
||||
max_route_hop_count: usize,
|
||||
validate_dial_info_receipt_time_ms: u32,
|
||||
update_callback: UpdateCallback,
|
||||
waiting_rpc_table: OperationWaiter<RPCMessage>,
|
||||
waiting_app_call_table: OperationWaiter<Vec<u8>>,
|
||||
waiting_rpc_table: OperationWaiter<RPCMessage, Option<QuestionContext>>,
|
||||
waiting_app_call_table: OperationWaiter<Vec<u8>, ()>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -244,6 +247,7 @@ pub struct RPCProcessor {
|
||||
crypto: Crypto,
|
||||
config: VeilidConfig,
|
||||
network_manager: NetworkManager,
|
||||
storage_manager: StorageManager,
|
||||
routing_table: RoutingTable,
|
||||
inner: Arc<Mutex<RPCProcessorInner>>,
|
||||
unlocked_inner: Arc<RPCProcessorUnlockedInner>,
|
||||
@@ -295,6 +299,7 @@ impl RPCProcessor {
|
||||
config: config.clone(),
|
||||
network_manager: network_manager.clone(),
|
||||
routing_table: network_manager.routing_table(),
|
||||
storage_manager: network_manager.storage_manager(),
|
||||
inner: Arc::new(Mutex::new(Self::new_inner())),
|
||||
unlocked_inner: Arc::new(Self::new_unlocked_inner(config, update_callback)),
|
||||
}
|
||||
@@ -308,33 +313,44 @@ impl RPCProcessor {
|
||||
self.routing_table.clone()
|
||||
}
|
||||
|
||||
pub fn storage_manager(&self) -> StorageManager {
|
||||
self.storage_manager.clone()
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////
|
||||
|
||||
#[instrument(level = "debug", skip_all, err)]
|
||||
pub async fn startup(&self) -> EyreResult<()> {
|
||||
trace!("startup rpc processor");
|
||||
let mut inner = self.inner.lock();
|
||||
debug!("startup rpc processor");
|
||||
{
|
||||
let mut inner = self.inner.lock();
|
||||
|
||||
let channel = flume::bounded(self.unlocked_inner.queue_size as usize);
|
||||
inner.send_channel = Some(channel.0.clone());
|
||||
inner.stop_source = Some(StopSource::new());
|
||||
let channel = flume::bounded(self.unlocked_inner.queue_size as usize);
|
||||
inner.send_channel = Some(channel.0.clone());
|
||||
inner.stop_source = Some(StopSource::new());
|
||||
|
||||
// spin up N workers
|
||||
trace!(
|
||||
"Spinning up {} RPC workers",
|
||||
self.unlocked_inner.concurrency
|
||||
);
|
||||
for _ in 0..self.unlocked_inner.concurrency {
|
||||
let this = self.clone();
|
||||
let receiver = channel.1.clone();
|
||||
let jh = spawn(Self::rpc_worker(
|
||||
this,
|
||||
inner.stop_source.as_ref().unwrap().token(),
|
||||
receiver,
|
||||
));
|
||||
inner.worker_join_handles.push(jh);
|
||||
// spin up N workers
|
||||
trace!(
|
||||
"Spinning up {} RPC workers",
|
||||
self.unlocked_inner.concurrency
|
||||
);
|
||||
for _ in 0..self.unlocked_inner.concurrency {
|
||||
let this = self.clone();
|
||||
let receiver = channel.1.clone();
|
||||
let jh = spawn(Self::rpc_worker(
|
||||
this,
|
||||
inner.stop_source.as_ref().unwrap().token(),
|
||||
receiver,
|
||||
));
|
||||
inner.worker_join_handles.push(jh);
|
||||
}
|
||||
}
|
||||
|
||||
// Inform storage manager we are up
|
||||
self.storage_manager
|
||||
.set_rpc_processor(Some(self.clone()))
|
||||
.await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -342,6 +358,9 @@ impl RPCProcessor {
|
||||
pub async fn shutdown(&self) {
|
||||
debug!("starting rpc processor shutdown");
|
||||
|
||||
// Stop storage manager from using us
|
||||
self.storage_manager.set_rpc_processor(None).await;
|
||||
|
||||
// Stop the rpc workers
|
||||
let mut unord = FuturesUnordered::new();
|
||||
{
|
||||
@@ -382,44 +401,79 @@ impl RPCProcessor {
|
||||
|
||||
/// Search the DHT for a single node closest to a key and add it to the routing table and return the node reference
|
||||
/// If no node was found in the timeout, this returns None
|
||||
pub async fn search_dht_single_key(
|
||||
async fn search_dht_single_key(
|
||||
&self,
|
||||
_node_id: PublicKey,
|
||||
_count: u32,
|
||||
_fanout: u32,
|
||||
_timeout: Option<u64>,
|
||||
) -> Result<Option<NodeRef>, RPCError> {
|
||||
//let routing_table = self.routing_table();
|
||||
node_id: TypedKey,
|
||||
count: usize,
|
||||
fanout: usize,
|
||||
timeout_us: TimestampDuration,
|
||||
safety_selection: SafetySelection,
|
||||
) -> TimeoutOr<Result<Option<NodeRef>, RPCError>> {
|
||||
let routing_table = self.routing_table();
|
||||
|
||||
// xxx find node but stop if we find the exact node we want
|
||||
// xxx return whatever node is closest after the timeout
|
||||
Err(RPCError::unimplemented("search_dht_single_key")).map_err(logthru_rpc!(error))
|
||||
}
|
||||
// Routine to call to generate fanout
|
||||
let call_routine = |next_node: NodeRef| {
|
||||
let this = self.clone();
|
||||
async move {
|
||||
match this
|
||||
.clone()
|
||||
.rpc_call_find_node(
|
||||
Destination::direct(next_node).with_safety(safety_selection),
|
||||
node_id,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(v) => {
|
||||
let v = network_result_value_or_log!(v => {
|
||||
// Any other failures, just try the next node
|
||||
return Ok(None);
|
||||
});
|
||||
Ok(Some(v.answer))
|
||||
}
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
/// Search the DHT for the 'count' closest nodes to a key, adding them all to the routing table if they are not there and returning their node references
|
||||
pub async fn search_dht_multi_key(
|
||||
&self,
|
||||
_node_id: PublicKey,
|
||||
_count: u32,
|
||||
_fanout: u32,
|
||||
_timeout: Option<u64>,
|
||||
) -> Result<Vec<NodeRef>, RPCError> {
|
||||
// xxx return closest nodes after the timeout
|
||||
Err(RPCError::unimplemented("search_dht_multi_key")).map_err(logthru_rpc!(error))
|
||||
// Routine to call to check if we're done at each step
|
||||
let check_done = |closest_nodes: &[NodeRef]| {
|
||||
// If the node we want to locate is one of the closest nodes, return it immediately
|
||||
if let Some(out) = closest_nodes
|
||||
.iter()
|
||||
.find(|x| x.node_ids().contains(&node_id))
|
||||
{
|
||||
return Some(out.clone());
|
||||
}
|
||||
None
|
||||
};
|
||||
|
||||
// Call the fanout
|
||||
let fanout_call = FanoutCall::new(
|
||||
routing_table.clone(),
|
||||
node_id,
|
||||
count,
|
||||
fanout,
|
||||
timeout_us,
|
||||
call_routine,
|
||||
check_done,
|
||||
);
|
||||
|
||||
fanout_call.run().await
|
||||
}
|
||||
|
||||
/// Search the DHT for a specific node corresponding to a key unless we have that node in our routing table already, and return the node reference
|
||||
/// Note: This routine can possible be recursive, hence the SendPinBoxFuture async form
|
||||
pub fn resolve_node(
|
||||
&self,
|
||||
node_id: PublicKey,
|
||||
node_id: TypedKey,
|
||||
safety_selection: SafetySelection,
|
||||
) -> SendPinBoxFuture<Result<Option<NodeRef>, RPCError>> {
|
||||
let this = self.clone();
|
||||
Box::pin(async move {
|
||||
let routing_table = this.routing_table();
|
||||
|
||||
// First see if we have the node in our routing table already
|
||||
if let Some(nr) = routing_table.lookup_any_node_ref(node_id) {
|
||||
if let Some(nr) = routing_table.lookup_node_ref(node_id) {
|
||||
// ensure we have some dial info for the entry already,
|
||||
// if not, we should do the find_node anyway
|
||||
if nr.has_any_dial_info() {
|
||||
@@ -428,21 +482,30 @@ impl RPCProcessor {
|
||||
}
|
||||
|
||||
// If nobody knows where this node is, ask the DHT for it
|
||||
let (count, fanout, timeout) = {
|
||||
let (node_count, _consensus_count, fanout, timeout) = {
|
||||
let c = this.config.get();
|
||||
(
|
||||
c.network.dht.resolve_node_count,
|
||||
c.network.dht.resolve_node_fanout,
|
||||
c.network.dht.resolve_node_timeout_ms.map(ms_to_us),
|
||||
c.network.dht.max_find_node_count as usize,
|
||||
c.network.dht.resolve_node_count as usize,
|
||||
c.network.dht.resolve_node_fanout as usize,
|
||||
TimestampDuration::from(ms_to_us(c.network.dht.resolve_node_timeout_ms)),
|
||||
)
|
||||
};
|
||||
|
||||
let nr = this
|
||||
.search_dht_single_key(node_id, count, fanout, timeout)
|
||||
.await?;
|
||||
// Search in preferred cryptosystem order
|
||||
let nr = match this
|
||||
.search_dht_single_key(node_id, node_count, fanout, timeout, safety_selection)
|
||||
.await
|
||||
{
|
||||
TimeoutOr::Timeout => None,
|
||||
TimeoutOr::Value(Ok(v)) => v,
|
||||
TimeoutOr::Value(Err(e)) => {
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(nr) = &nr {
|
||||
if nr.node_ids().contains_key(&node_id) {
|
||||
if nr.node_ids().contains(&node_id) {
|
||||
// found a close node, but not exact within our configured resolve_node timeout
|
||||
return Ok(None);
|
||||
}
|
||||
@@ -542,10 +605,7 @@ impl RPCProcessor {
|
||||
|
||||
// Prepare route operation
|
||||
let sr_hop_count = compiled_route.safety_route.hop_count;
|
||||
let route_operation = RPCOperationRoute {
|
||||
safety_route: compiled_route.safety_route,
|
||||
operation,
|
||||
};
|
||||
let route_operation = RPCOperationRoute::new(compiled_route.safety_route, operation);
|
||||
let ssni_route =
|
||||
self.get_sender_peer_info(&Destination::direct(compiled_route.first_hop.clone()));
|
||||
let operation = RPCOperation::new_statement(
|
||||
@@ -753,7 +813,7 @@ impl RPCProcessor {
|
||||
};
|
||||
|
||||
// Get our node info timestamp
|
||||
let our_node_info_ts = own_peer_info.signed_node_info.timestamp();
|
||||
let our_node_info_ts = own_peer_info.signed_node_info().timestamp();
|
||||
|
||||
// If the target has seen our node info already don't send it again
|
||||
if target.has_seen_our_node_info_ts(routing_domain, our_node_info_ts) {
|
||||
@@ -998,11 +1058,13 @@ impl RPCProcessor {
|
||||
}
|
||||
|
||||
/// Issue a question over the network, possibly using an anonymized route
|
||||
/// Optionally keeps a context to be passed to the answer processor when an answer is received
|
||||
#[instrument(level = "debug", skip(self, question), err)]
|
||||
async fn question(
|
||||
&self,
|
||||
dest: Destination,
|
||||
question: RPCQuestion,
|
||||
context: Option<QuestionContext>,
|
||||
) -> Result<NetworkResult<WaitableReply>, RPCError> {
|
||||
// Get sender peer info if we should send that
|
||||
let spi = self.get_sender_peer_info(&dest);
|
||||
@@ -1030,7 +1092,10 @@ impl RPCProcessor {
|
||||
let timeout_us = self.unlocked_inner.timeout_us * (hop_count as u64);
|
||||
|
||||
// Set up op id eventual
|
||||
let handle = self.unlocked_inner.waiting_rpc_table.add_op_waiter(op_id);
|
||||
let handle = self
|
||||
.unlocked_inner
|
||||
.waiting_rpc_table
|
||||
.add_op_waiter(op_id, context);
|
||||
|
||||
// Send question
|
||||
let bytes: ByteCount = (message.len() as u64).into();
|
||||
@@ -1072,7 +1137,7 @@ impl RPCProcessor {
|
||||
}))
|
||||
}
|
||||
|
||||
// Issue a statement over the network, possibly using an anonymized route
|
||||
/// Issue a statement over the network, possibly using an anonymized route
|
||||
#[instrument(level = "debug", skip(self, statement), err)]
|
||||
async fn statement(
|
||||
&self,
|
||||
@@ -1128,9 +1193,8 @@ impl RPCProcessor {
|
||||
|
||||
Ok(NetworkResult::value(()))
|
||||
}
|
||||
|
||||
// Issue a reply over the network, possibly using an anonymized route
|
||||
// The request must want a response, or this routine fails
|
||||
/// Issue a reply over the network, possibly using an anonymized route
|
||||
/// The request must want a response, or this routine fails
|
||||
#[instrument(level = "debug", skip(self, request, answer), err)]
|
||||
async fn answer(
|
||||
&self,
|
||||
@@ -1189,6 +1253,55 @@ impl RPCProcessor {
|
||||
Ok(NetworkResult::value(()))
|
||||
}
|
||||
|
||||
/// Decoding RPC from the wire
|
||||
/// This performs a capnp decode on the data, and if it passes the capnp schema
|
||||
/// it performs the cryptographic validation required to pass the operation up for processing
|
||||
fn decode_rpc_operation(
|
||||
&self,
|
||||
encoded_msg: &RPCMessageEncoded,
|
||||
) -> Result<RPCOperation, RPCError> {
|
||||
let reader = encoded_msg.data.get_reader()?;
|
||||
let op_reader = reader
|
||||
.get_root::<veilid_capnp::operation::Reader>()
|
||||
.map_err(RPCError::protocol)
|
||||
.map_err(logthru_rpc!())?;
|
||||
let mut operation = RPCOperation::decode(&op_reader)?;
|
||||
|
||||
// Validate the RPC message
|
||||
self.validate_rpc_operation(&mut operation)?;
|
||||
|
||||
Ok(operation)
|
||||
}
|
||||
|
||||
/// Cryptographic RPC validation
|
||||
/// We do this as part of the RPC network layer to ensure that any RPC operations that are
|
||||
/// processed have already been validated cryptographically and it is not the job of the
|
||||
/// caller or receiver. This does not mean the operation is 'semantically correct'. For
|
||||
/// complex operations that require stateful validation and a more robust context than
|
||||
/// 'signatures', the caller must still perform whatever validation is necessary
|
||||
fn validate_rpc_operation(&self, operation: &mut RPCOperation) -> Result<(), RPCError> {
|
||||
// If this is an answer, get the question context for this answer
|
||||
// If we received an answer for a question we did not ask, this will return an error
|
||||
let question_context = if let RPCOperationKind::Answer(_) = operation.kind() {
|
||||
let op_id = operation.op_id();
|
||||
self.unlocked_inner
|
||||
.waiting_rpc_table
|
||||
.get_op_context(op_id)?
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// Validate the RPC operation
|
||||
let validate_context = RPCValidateContext {
|
||||
crypto: self.crypto.clone(),
|
||||
rpc_processor: self.clone(),
|
||||
question_context,
|
||||
};
|
||||
operation.validate(&validate_context)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////
|
||||
#[instrument(level = "trace", skip(self, encoded_msg), err)]
|
||||
async fn process_rpc_message(
|
||||
@@ -1198,32 +1311,26 @@ impl RPCProcessor {
|
||||
// Decode operation appropriately based on header detail
|
||||
let msg = match &encoded_msg.header.detail {
|
||||
RPCMessageHeaderDetail::Direct(detail) => {
|
||||
// Decode and validate the RPC operation
|
||||
let operation = match self.decode_rpc_operation(&encoded_msg) {
|
||||
Ok(v) => v,
|
||||
Err(e) => return Ok(NetworkResult::invalid_message(e)),
|
||||
};
|
||||
|
||||
// Get the routing domain this message came over
|
||||
let routing_domain = detail.routing_domain;
|
||||
|
||||
// Decode the operation
|
||||
// Get the sender noderef, incorporating sender's peer info
|
||||
let sender_node_id = TypedKey::new(
|
||||
detail.envelope.get_crypto_kind(),
|
||||
detail.envelope.get_sender_id(),
|
||||
);
|
||||
|
||||
// Decode the RPC message
|
||||
let operation = {
|
||||
let reader = encoded_msg.data.get_reader()?;
|
||||
let op_reader = reader
|
||||
.get_root::<veilid_capnp::operation::Reader>()
|
||||
.map_err(RPCError::protocol)
|
||||
.map_err(logthru_rpc!())?;
|
||||
RPCOperation::decode(&op_reader, self.crypto.clone())?
|
||||
};
|
||||
|
||||
// Get the sender noderef, incorporating sender's peer info
|
||||
let mut opt_sender_nr: Option<NodeRef> = None;
|
||||
if let Some(sender_peer_info) = operation.sender_peer_info() {
|
||||
// Ensure the sender peer info is for the actual sender specified in the envelope
|
||||
|
||||
// Sender PeerInfo was specified, update our routing table with it
|
||||
if !self.filter_node_info(routing_domain, &sender_peer_info.signed_node_info) {
|
||||
if !self.filter_node_info(routing_domain, sender_peer_info.signed_node_info()) {
|
||||
return Err(RPCError::invalid_format(
|
||||
"sender peerinfo has invalid peer scope",
|
||||
));
|
||||
@@ -1243,7 +1350,8 @@ impl RPCProcessor {
|
||||
// Update the 'seen our node info' timestamp to determine if this node needs a
|
||||
// 'node info update' ping
|
||||
if let Some(sender_nr) = &opt_sender_nr {
|
||||
sender_nr.set_our_node_info_ts(routing_domain, operation.target_node_info_ts());
|
||||
sender_nr
|
||||
.set_seen_our_node_info_ts(routing_domain, operation.target_node_info_ts());
|
||||
}
|
||||
|
||||
// Make the RPC message
|
||||
@@ -1254,15 +1362,8 @@ impl RPCProcessor {
|
||||
}
|
||||
}
|
||||
RPCMessageHeaderDetail::SafetyRouted(_) | RPCMessageHeaderDetail::PrivateRouted(_) => {
|
||||
// Decode the RPC message
|
||||
let operation = {
|
||||
let reader = encoded_msg.data.get_reader()?;
|
||||
let op_reader = reader
|
||||
.get_root::<veilid_capnp::operation::Reader>()
|
||||
.map_err(RPCError::protocol)
|
||||
.map_err(logthru_rpc!())?;
|
||||
RPCOperation::decode(&op_reader, self.crypto.clone())?
|
||||
};
|
||||
// Decode and validate the RPC operation
|
||||
let operation = self.decode_rpc_operation(&encoded_msg)?;
|
||||
|
||||
// Make the RPC message
|
||||
RPCMessage {
|
||||
|
||||
Reference in New Issue
Block a user