checkpoint

This commit is contained in:
John Smith
2022-12-16 20:07:28 -05:00
parent 10a0e3b629
commit 221c09b555
40 changed files with 428 additions and 298 deletions

View File

@@ -53,9 +53,9 @@ impl RPCOperationKind {
#[derive(Debug, Clone)]
pub struct RPCOperation {
op_id: u64,
op_id: OperationId,
sender_node_info: Option<SignedNodeInfo>,
target_node_info_ts: u64,
target_node_info_ts: Timestamp,
kind: RPCOperationKind,
}
@@ -65,7 +65,7 @@ impl RPCOperation {
sender_signed_node_info: SenderSignedNodeInfo,
) -> Self {
Self {
op_id: get_random_u64(),
op_id: OperationId::new(get_random_u64()),
sender_node_info: sender_signed_node_info.signed_node_info,
target_node_info_ts: sender_signed_node_info.target_node_info_ts,
kind: RPCOperationKind::Question(question),
@@ -76,7 +76,7 @@ impl RPCOperation {
sender_signed_node_info: SenderSignedNodeInfo,
) -> Self {
Self {
op_id: get_random_u64(),
op_id: OperationId::new(get_random_u64()),
sender_node_info: sender_signed_node_info.signed_node_info,
target_node_info_ts: sender_signed_node_info.target_node_info_ts,
kind: RPCOperationKind::Statement(statement),
@@ -96,14 +96,14 @@ impl RPCOperation {
}
}
pub fn op_id(&self) -> u64 {
pub fn op_id(&self) -> OperationId {
self.op_id
}
pub fn sender_node_info(&self) -> Option<&SignedNodeInfo> {
self.sender_node_info.as_ref()
}
pub fn target_node_info_ts(&self) -> u64 {
pub fn target_node_info_ts(&self) -> Timestamp {
self.target_node_info_ts
}
@@ -119,7 +119,7 @@ impl RPCOperation {
operation_reader: &veilid_capnp::operation::Reader,
opt_sender_node_id: Option<&DHTKey>,
) -> Result<Self, RPCError> {
let op_id = operation_reader.get_op_id();
let op_id = OperationId::new(operation_reader.get_op_id());
let sender_node_info = if operation_reader.has_sender_node_info() {
if let Some(sender_node_id) = opt_sender_node_id {
@@ -135,7 +135,7 @@ impl RPCOperation {
None
};
let target_node_info_ts = operation_reader.get_target_node_info_ts();
let target_node_info_ts = Timestamp::new(operation_reader.get_target_node_info_ts());
let kind_reader = operation_reader.get_kind();
let kind = RPCOperationKind::decode(&kind_reader)?;
@@ -149,12 +149,12 @@ impl RPCOperation {
}
pub fn encode(&self, builder: &mut veilid_capnp::operation::Builder) -> Result<(), RPCError> {
builder.set_op_id(self.op_id);
builder.set_op_id(self.op_id.as_u64());
if let Some(sender_info) = &self.sender_node_info {
let mut si_builder = builder.reborrow().init_sender_node_info();
encode_signed_node_info(&sender_info, &mut si_builder)?;
}
builder.set_target_node_info_ts(self.target_node_info_ts);
builder.set_target_node_info_ts(self.target_node_info_ts.as_u64());
let mut k_builder = builder.reborrow().init_kind();
self.kind.encode(&mut k_builder)?;
Ok(())

View File

@@ -10,7 +10,7 @@ pub fn encode_signed_direct_node_info(
builder
.reborrow()
.set_timestamp(signed_direct_node_info.timestamp);
.set_timestamp(signed_direct_node_info.timestamp.into());
let mut sig_builder = builder.reborrow().init_signature();
let Some(signature) = &signed_direct_node_info.signature else {
@@ -36,7 +36,7 @@ pub fn decode_signed_direct_node_info(
.get_signature()
.map_err(RPCError::protocol)?;
let timestamp = reader.reborrow().get_timestamp();
let timestamp = reader.reborrow().get_timestamp().into();
let signature = decode_signature(&sig_reader);

View File

@@ -16,7 +16,7 @@ pub fn encode_signed_relayed_node_info(
builder
.reborrow()
.set_timestamp(signed_relayed_node_info.timestamp);
.set_timestamp(signed_relayed_node_info.timestamp.into());
let mut sig_builder = builder.reborrow().init_signature();
encode_signature(&signed_relayed_node_info.signature, &mut sig_builder);
@@ -50,7 +50,7 @@ pub fn decode_signed_relayed_node_info(
.reborrow()
.get_signature()
.map_err(RPCError::protocol)?;
let timestamp = reader.reborrow().get_timestamp();
let timestamp = reader.reborrow().get_timestamp().into();
let signature = decode_signature(&sig_reader);

View File

@@ -37,8 +37,6 @@ use stop_token::future::FutureExt;
/////////////////////////////////////////////////////////////////////
type OperationId = u64;
#[derive(Debug, Clone)]
struct RPCMessageHeaderDetailDirect {
/// The decoded header of the envelope
@@ -82,9 +80,9 @@ enum RPCMessageHeaderDetail {
#[derive(Debug, Clone)]
struct RPCMessageHeader {
/// Time the message was received, not sent
timestamp: u64,
timestamp: Timestamp,
/// The length in bytes of the rpc message body
body_len: u64,
body_len: ByteCount,
/// The header detail depending on which way the message was received
detail: RPCMessageHeaderDetail,
}
@@ -139,9 +137,9 @@ where
#[derive(Debug)]
struct WaitableReply {
handle: OperationWaitHandle<RPCMessage>,
timeout: u64,
timeout_us: TimestampDuration,
node_ref: NodeRef,
send_ts: u64,
send_ts: Timestamp,
send_data_kind: SendDataKind,
safety_route: Option<DHTKey>,
remote_private_route: Option<DHTKey>,
@@ -152,11 +150,11 @@ struct WaitableReply {
#[derive(Clone, Debug, Default)]
pub struct Answer<T> {
pub latency: u64, // how long it took to get this answer
pub answer: T, // the answer itself
pub latency: TimestampDuration, // how long it took to get this answer
pub answer: T, // the answer itself
}
impl<T> Answer<T> {
pub fn new(latency: u64, answer: T) -> Self {
pub fn new(latency: TimestampDuration, answer: T) -> Self {
Self { latency, answer }
}
}
@@ -185,16 +183,16 @@ pub struct SenderSignedNodeInfo {
/// The current signed node info of the sender if required
signed_node_info: Option<SignedNodeInfo>,
/// The last timestamp of the target's node info to assist remote node with sending its latest node info
target_node_info_ts: u64,
target_node_info_ts: Timestamp,
}
impl SenderSignedNodeInfo {
pub fn new_no_sni(target_node_info_ts: u64) -> Self {
pub fn new_no_sni(target_node_info_ts: Timestamp) -> Self {
Self {
signed_node_info: None,
target_node_info_ts,
}
}
pub fn new(sender_signed_node_info: SignedNodeInfo, target_node_info_ts: u64) -> Self {
pub fn new(sender_signed_node_info: SignedNodeInfo, target_node_info_ts: Timestamp) -> Self {
Self {
signed_node_info: Some(sender_signed_node_info),
target_node_info_ts,
@@ -218,7 +216,7 @@ pub struct RPCProcessorInner {
}
pub struct RPCProcessorUnlockedInner {
timeout: u64,
timeout_us: TimestampDuration,
queue_size: u32,
concurrency: u32,
max_route_hop_count: usize,
@@ -267,7 +265,7 @@ impl RPCProcessor {
let validate_dial_info_receipt_time_ms = c.network.dht.validate_dial_info_receipt_time_ms;
RPCProcessorUnlockedInner {
timeout,
timeout_us: timeout,
queue_size,
concurrency,
max_route_hop_count,
@@ -445,11 +443,11 @@ impl RPCProcessor {
async fn wait_for_reply(
&self,
waitable_reply: WaitableReply,
) -> Result<TimeoutOr<(RPCMessage, u64)>, RPCError> {
) -> Result<TimeoutOr<(RPCMessage, TimestampDuration)>, RPCError> {
let out = self
.unlocked_inner
.waiting_rpc_table
.wait_for_op(waitable_reply.handle, waitable_reply.timeout)
.wait_for_op(waitable_reply.handle, waitable_reply.timeout_us)
.await;
match &out {
Err(_) | Ok(TimeoutOr::Timeout) => {
@@ -463,7 +461,7 @@ impl RPCProcessor {
}
Ok(TimeoutOr::Value((rpcreader, _))) => {
// Reply received
let recv_ts = get_timestamp();
let recv_ts = get_aligned_timestamp();
// Record answer received
self.record_answer_received(
@@ -759,7 +757,7 @@ impl RPCProcessor {
fn record_send_failure(
&self,
rpc_kind: RPCKind,
send_ts: u64,
send_ts: Timestamp,
node_ref: NodeRef,
safety_route: Option<DHTKey>,
remote_private_route: Option<DHTKey>,
@@ -788,7 +786,7 @@ impl RPCProcessor {
/// Record question lost to node or route
fn record_question_lost(
&self,
send_ts: u64,
send_ts: Timestamp,
node_ref: NodeRef,
safety_route: Option<DHTKey>,
remote_private_route: Option<DHTKey>,
@@ -827,8 +825,8 @@ impl RPCProcessor {
fn record_send_success(
&self,
rpc_kind: RPCKind,
send_ts: u64,
bytes: u64,
send_ts: Timestamp,
bytes: ByteCount,
node_ref: NodeRef,
safety_route: Option<DHTKey>,
remote_private_route: Option<DHTKey>,
@@ -863,9 +861,9 @@ impl RPCProcessor {
/// Record answer received from node or route
fn record_answer_received(
&self,
send_ts: u64,
recv_ts: u64,
bytes: u64,
send_ts: Timestamp,
recv_ts: Timestamp,
bytes: ByteCount,
node_ref: NodeRef,
safety_route: Option<DHTKey>,
remote_private_route: Option<DHTKey>,
@@ -1004,7 +1002,7 @@ impl RPCProcessor {
let op_id = operation.op_id();
// Log rpc send
trace!(target: "rpc_message", dir = "send", kind = "question", op_id, desc = operation.kind().desc(), ?dest);
trace!(target: "rpc_message", dir = "send", kind = "question", op_id = op_id.as_u64(), desc = operation.kind().desc(), ?dest);
// Produce rendered operation
let RenderedOperation {
@@ -1019,14 +1017,14 @@ impl RPCProcessor {
// Calculate answer timeout
// Timeout is number of hops times the timeout per hop
let timeout = self.unlocked_inner.timeout * (hop_count as u64);
let timeout_us = self.unlocked_inner.timeout_us * (hop_count as u64);
// Set up op id eventual
let handle = self.unlocked_inner.waiting_rpc_table.add_op_waiter(op_id);
// Send question
let bytes = message.len() as u64;
let send_ts = get_timestamp();
let bytes: ByteCount = (message.len() as u64).into();
let send_ts = get_aligned_timestamp();
let send_data_kind = network_result_try!(self
.network_manager()
.send_envelope(node_ref.clone(), Some(node_id), message)
@@ -1054,7 +1052,7 @@ impl RPCProcessor {
// Pass back waitable reply completion
Ok(NetworkResult::value(WaitableReply {
handle,
timeout,
timeout_us,
node_ref,
send_ts,
send_data_kind,
@@ -1078,7 +1076,7 @@ impl RPCProcessor {
let operation = RPCOperation::new_statement(statement, ssni);
// Log rpc send
trace!(target: "rpc_message", dir = "send", kind = "statement", op_id = operation.op_id(), desc = operation.kind().desc(), ?dest);
trace!(target: "rpc_message", dir = "send", kind = "statement", op_id = operation.op_id().as_u64(), desc = operation.kind().desc(), ?dest);
// Produce rendered operation
let RenderedOperation {
@@ -1092,8 +1090,8 @@ impl RPCProcessor {
} = network_result_try!(self.render_operation(dest, &operation)?);
// Send statement
let bytes = message.len() as u64;
let send_ts = get_timestamp();
let bytes: ByteCount = (message.len() as u64).into();
let send_ts = get_aligned_timestamp();
let _send_data_kind = network_result_try!(self
.network_manager()
.send_envelope(node_ref.clone(), Some(node_id), message)
@@ -1139,7 +1137,7 @@ impl RPCProcessor {
let operation = RPCOperation::new_answer(&request.operation, answer, ssni);
// Log rpc send
trace!(target: "rpc_message", dir = "send", kind = "answer", op_id = operation.op_id(), desc = operation.kind().desc(), ?dest);
trace!(target: "rpc_message", dir = "send", kind = "answer", op_id = operation.op_id().as_u64(), desc = operation.kind().desc(), ?dest);
// Produce rendered operation
let RenderedOperation {
@@ -1153,8 +1151,8 @@ impl RPCProcessor {
} = network_result_try!(self.render_operation(dest, &operation)?);
// Send the reply
let bytes = message.len() as u64;
let send_ts = get_timestamp();
let bytes: ByteCount = (message.len() as u64).into();
let send_ts = get_aligned_timestamp();
network_result_try!(self.network_manager()
.send_envelope(node_ref.clone(), Some(node_id), message)
.await
@@ -1284,7 +1282,7 @@ impl RPCProcessor {
};
// Log rpc receive
trace!(target: "rpc_message", dir = "recv", kind, op_id = msg.operation.op_id(), desc = msg.operation.kind().desc(), header = ?msg.header);
trace!(target: "rpc_message", dir = "recv", kind, op_id = msg.operation.op_id().as_u64(), desc = msg.operation.kind().desc(), header = ?msg.header);
// Process specific message kind
match msg.operation.kind() {
@@ -1366,7 +1364,7 @@ impl RPCProcessor {
connection_descriptor,
routing_domain,
}),
timestamp: get_timestamp(),
timestamp: get_aligned_timestamp(),
body_len: body.len() as u64,
},
data: RPCMessageData { contents: body },
@@ -1395,8 +1393,8 @@ impl RPCProcessor {
remote_safety_route,
sequencing,
}),
timestamp: get_timestamp(),
body_len: body.len() as u64,
timestamp: get_aligned_timestamp(),
body_len: (body.len() as u64).into(),
},
data: RPCMessageData { contents: body },
};
@@ -1428,8 +1426,8 @@ impl RPCProcessor {
safety_spec,
},
),
timestamp: get_timestamp(),
body_len: body.len() as u64,
timestamp: get_aligned_timestamp(),
body_len: (body.len() as u64).into(),
},
data: RPCMessageData { contents: body },
};

View File

@@ -104,9 +104,9 @@ where
pub async fn wait_for_op(
&self,
mut handle: OperationWaitHandle<T>,
timeout_us: u64,
) -> Result<TimeoutOr<(T, u64)>, RPCError> {
let timeout_ms = u32::try_from(timeout_us / 1000u64)
timeout_us: TimestampDuration,
) -> Result<TimeoutOr<(T, TimestampDuration)>, RPCError> {
let timeout_ms = u32::try_from(timeout_us.as_u64() / 1000u64)
.map_err(|e| RPCError::map_internal("invalid timeout")(e))?;
// Take the instance
@@ -114,7 +114,7 @@ where
let eventual_instance = handle.eventual_instance.take().unwrap();
// wait for eventualvalue
let start_ts = get_timestamp();
let start_ts = get_aligned_timestamp();
let res = timeout(timeout_ms, eventual_instance)
.await
.into_timeout_or();
@@ -125,7 +125,7 @@ where
})
.map(|res| {
let (_span_id, ret) = res.take_value().unwrap();
let end_ts = get_timestamp();
let end_ts = get_aligned_timestamp();
//xxx: causes crash (Missing otel data span extensions)
// Span::current().follows_from(span_id);

View File

@@ -73,7 +73,7 @@ impl RPCProcessor {
let res = self
.unlocked_inner
.waiting_app_call_table
.wait_for_op(handle, self.unlocked_inner.timeout)
.wait_for_op(handle, self.unlocked_inner.timeout_us)
.await?;
let (message, _latency) = match res {
TimeoutOr::Timeout => {
@@ -93,7 +93,7 @@ impl RPCProcessor {
}
/// Exposed to API for apps to return app call answers
pub async fn app_call_reply(&self, id: u64, message: Vec<u8>) -> Result<(), RPCError> {
pub async fn app_call_reply(&self, id: OperationId, message: Vec<u8>) -> Result<(), RPCError> {
self.unlocked_inner
.waiting_app_call_table
.complete_op_waiter(id, message)

View File

@@ -196,7 +196,6 @@ impl RPCProcessor {
&routed_operation.data,
sender_id,
)
.map_err(RPCError::protocol)?
else {
return Ok(NetworkResult::invalid_message("signatures did not validate for private route"));
};