veilid/veilid-core/src/receipt_manager.rs

480 lines
15 KiB
Rust
Raw Normal View History

2021-11-22 16:28:30 +00:00
use crate::*;
use core::fmt;
2022-10-30 23:29:31 +00:00
use crypto::*;
2021-11-22 16:28:30 +00:00
use futures_util::stream::{FuturesUnordered, StreamExt};
use network_manager::*;
2022-04-17 17:28:39 +00:00
use routing_table::*;
2022-06-13 00:58:02 +00:00
use stop_token::future::FutureExt;
2021-11-22 16:28:30 +00:00
2022-10-22 01:27:07 +00:00
#[derive(Clone, Debug)]
2021-11-22 16:28:30 +00:00
pub enum ReceiptEvent {
2022-05-28 20:11:50 +00:00
ReturnedOutOfBand,
ReturnedInBand { inbound_noderef: NodeRef },
2022-11-02 19:36:01 +00:00
ReturnedSafety,
2023-02-11 20:54:55 +00:00
ReturnedPrivate { private_route: TypedKey },
2021-11-27 17:44:21 +00:00
Expired,
Cancelled,
2021-11-22 16:28:30 +00:00
}
2022-10-31 03:23:12 +00:00
#[derive(Clone, Debug)]
pub enum ReceiptReturned {
OutOfBand,
InBand { inbound_noderef: NodeRef },
2022-11-02 19:36:01 +00:00
Safety,
2023-02-11 20:54:55 +00:00
Private { private_route: TypedKey },
2022-10-31 03:23:12 +00:00
}
2022-07-12 12:02:22 +00:00
pub trait ReceiptCallback: Send + 'static {
fn call(
&self,
event: ReceiptEvent,
receipt: Receipt,
returns_so_far: u32,
expected_returns: u32,
2022-07-14 20:57:34 +00:00
) -> SendPinBoxFuture<()>;
2022-07-12 12:02:22 +00:00
}
impl<F, T> ReceiptCallback for T
where
T: Fn(ReceiptEvent, Receipt, u32, u32) -> F + Send + 'static,
F: Future<Output = ()> + Send + 'static,
{
fn call(
&self,
event: ReceiptEvent,
receipt: Receipt,
returns_so_far: u32,
expected_returns: u32,
2022-07-14 20:57:34 +00:00
) -> SendPinBoxFuture<()> {
2022-07-12 12:02:22 +00:00
Box::pin(self(event, receipt, returns_so_far, expected_returns))
2021-11-22 16:28:30 +00:00
}
}
type ReceiptCallbackType = Box<dyn ReceiptCallback>;
type ReceiptSingleShotType = SingleShotEventual<ReceiptEvent>;
enum ReceiptRecordCallbackType {
Normal(ReceiptCallbackType),
SingleShot(Option<ReceiptSingleShotType>),
}
impl fmt::Debug for ReceiptRecordCallbackType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"ReceiptRecordCallbackType::{}",
match self {
Self::Normal(_) => "Normal".to_owned(),
Self::SingleShot(_) => "SingleShot".to_owned(),
}
)
}
}
pub struct ReceiptRecord {
2022-12-17 01:07:28 +00:00
expiration_ts: Timestamp,
2022-05-28 20:11:50 +00:00
receipt: Receipt,
2021-11-22 16:28:30 +00:00
expected_returns: u32,
returns_so_far: u32,
receipt_callback: ReceiptRecordCallbackType,
}
impl fmt::Debug for ReceiptRecord {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ReceiptRecord")
.field("expiration_ts", &self.expiration_ts)
2022-05-28 20:11:50 +00:00
.field("receipt", &self.receipt)
2021-11-22 16:28:30 +00:00
.field("expected_returns", &self.expected_returns)
.field("returns_so_far", &self.returns_so_far)
.field("receipt_callback", &self.receipt_callback)
.finish()
}
}
impl ReceiptRecord {
2022-05-28 14:07:57 +00:00
pub fn new(
2022-05-28 20:11:50 +00:00
receipt: Receipt,
2022-12-17 01:07:28 +00:00
expiration_ts: Timestamp,
2021-11-22 16:28:30 +00:00
expected_returns: u32,
receipt_callback: impl ReceiptCallback,
) -> Self {
Self {
2021-11-27 17:44:21 +00:00
expiration_ts,
2022-05-28 20:11:50 +00:00
receipt,
2021-11-27 17:44:21 +00:00
expected_returns,
2021-11-22 16:28:30 +00:00
returns_so_far: 0u32,
receipt_callback: ReceiptRecordCallbackType::Normal(Box::new(receipt_callback)),
}
}
2022-05-28 14:07:57 +00:00
pub fn new_single_shot(
2022-05-28 20:11:50 +00:00
receipt: Receipt,
2022-12-17 01:07:28 +00:00
expiration_ts: Timestamp,
2021-11-22 16:28:30 +00:00
eventual: ReceiptSingleShotType,
) -> Self {
Self {
2021-11-27 17:44:21 +00:00
expiration_ts,
2022-05-28 20:11:50 +00:00
receipt,
2021-11-22 16:28:30 +00:00
returns_so_far: 0u32,
expected_returns: 1u32,
receipt_callback: ReceiptRecordCallbackType::SingleShot(Some(eventual)),
}
}
}
/* XXX: may be useful for O(1) timestamp expiration
#[derive(Clone, Debug)]
struct ReceiptRecordTimestampSort {
2022-12-17 01:07:28 +00:00
expiration_ts: Timestamp,
2021-11-22 16:28:30 +00:00
record: Arc<Mutex<ReceiptRecord>>,
}
impl PartialEq for ReceiptRecordTimestampSort {
fn eq(&self, other: &ReceiptRecordTimestampSort) -> bool {
self.expiration_ts == other.expiration_ts
}
}
impl Eq for ReceiptRecordTimestampSort {}
impl Ord for ReceiptRecordTimestampSort {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.expiration_ts.cmp(&other.expiration_ts).reverse()
}
}
impl PartialOrd for ReceiptRecordTimestampSort {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(&other))
}
}
*/
///////////////////////////////////
pub struct ReceiptManagerInner {
network_manager: NetworkManager,
2023-02-08 02:44:50 +00:00
records_by_nonce: BTreeMap<Nonce, Arc<Mutex<ReceiptRecord>>>,
2022-12-17 01:07:28 +00:00
next_oldest_ts: Option<Timestamp>,
2022-06-13 00:58:02 +00:00
stop_source: Option<StopSource>,
timeout_task: MustJoinSingleFuture<()>,
2021-11-22 16:28:30 +00:00
}
#[derive(Clone)]
pub struct ReceiptManager {
inner: Arc<Mutex<ReceiptManagerInner>>,
}
impl ReceiptManager {
fn new_inner(network_manager: NetworkManager) -> ReceiptManagerInner {
ReceiptManagerInner {
2021-11-27 17:44:21 +00:00
network_manager,
2022-05-28 14:07:57 +00:00
records_by_nonce: BTreeMap::new(),
2021-11-22 16:28:30 +00:00
next_oldest_ts: None,
2022-06-13 00:58:02 +00:00
stop_source: None,
timeout_task: MustJoinSingleFuture::new(),
2021-11-22 16:28:30 +00:00
}
}
pub fn new(network_manager: NetworkManager) -> Self {
Self {
inner: Arc::new(Mutex::new(Self::new_inner(network_manager))),
}
}
pub fn network_manager(&self) -> NetworkManager {
self.inner.lock().network_manager.clone()
}
2022-07-10 21:36:50 +00:00
pub async fn startup(&self) -> EyreResult<()> {
2022-03-10 14:51:53 +00:00
trace!("startup receipt manager");
2021-11-22 16:28:30 +00:00
// Retrieve config
2022-06-13 00:58:02 +00:00
{
// let config = self.core().config();
// let c = config.get();
let mut inner = self.inner.lock();
inner.stop_source = Some(StopSource::new());
}
2021-11-22 16:28:30 +00:00
Ok(())
}
fn perform_callback(
evt: ReceiptEvent,
record_mut: &mut ReceiptRecord,
2022-07-14 20:57:34 +00:00
) -> Option<SendPinBoxFuture<()>> {
2021-11-22 16:28:30 +00:00
match &mut record_mut.receipt_callback {
ReceiptRecordCallbackType::Normal(callback) => Some(callback.call(
evt,
2022-05-28 20:11:50 +00:00
record_mut.receipt.clone(),
2021-11-22 16:28:30 +00:00
record_mut.returns_so_far,
record_mut.expected_returns,
)),
ReceiptRecordCallbackType::SingleShot(eventual) => {
// resolve this eventual with the receiptevent
// don't need to wait for the instance to receive it
// because this can only happen once
if let Some(eventual) = eventual.take() {
eventual.resolve(evt);
}
None
}
}
}
2022-06-10 21:07:10 +00:00
#[instrument(level = "trace", skip(self))]
2022-12-17 01:07:28 +00:00
pub async fn timeout_task_routine(self, now: Timestamp, stop_token: StopToken) {
2021-11-22 16:28:30 +00:00
// Go through all receipts and build a list of expired nonces
2022-12-17 01:07:28 +00:00
let mut new_next_oldest_ts: Option<Timestamp> = None;
2021-11-22 16:28:30 +00:00
let mut expired_records = Vec::new();
{
let mut inner = self.inner.lock();
let mut expired_nonces = Vec::new();
2022-05-28 14:07:57 +00:00
for (k, v) in &inner.records_by_nonce {
2021-11-22 16:28:30 +00:00
let receipt_inner = v.lock();
if receipt_inner.expiration_ts <= now {
// Expire this receipt
2021-11-27 17:44:21 +00:00
expired_nonces.push(*k);
2021-11-22 16:28:30 +00:00
} else if new_next_oldest_ts.is_none()
|| receipt_inner.expiration_ts < new_next_oldest_ts.unwrap()
{
// Mark the next oldest timestamp we would need to take action on as we go through everything
new_next_oldest_ts = Some(receipt_inner.expiration_ts);
}
}
2021-11-27 17:44:21 +00:00
if expired_nonces.is_empty() {
2021-11-22 16:28:30 +00:00
return;
}
// Now remove the expired receipts
for e in expired_nonces {
2022-05-28 14:07:57 +00:00
let expired_record = inner.records_by_nonce.remove(&e).expect("key should exist");
2021-11-22 16:28:30 +00:00
expired_records.push(expired_record);
}
// Update the next oldest timestamp
inner.next_oldest_ts = new_next_oldest_ts;
}
let mut callbacks = FuturesUnordered::new();
for expired_record in expired_records {
let mut expired_record_mut = expired_record.lock();
if let Some(callback) =
2021-11-27 17:44:21 +00:00
Self::perform_callback(ReceiptEvent::Expired, &mut expired_record_mut)
2021-11-22 16:28:30 +00:00
{
callbacks.push(callback.instrument(Span::current()))
2021-11-22 16:28:30 +00:00
}
}
// Wait on all the multi-call callbacks
2022-06-13 00:58:02 +00:00
loop {
match callbacks.next().timeout_at(stop_token.clone()).await {
Ok(Some(_)) => {}
Ok(None) | Err(_) => break,
}
}
2021-11-22 16:28:30 +00:00
}
2022-07-10 21:36:50 +00:00
pub async fn tick(&self) -> EyreResult<()> {
2022-06-13 00:58:02 +00:00
let (next_oldest_ts, timeout_task, stop_token) = {
2021-11-22 16:28:30 +00:00
let inner = self.inner.lock();
2022-06-13 00:58:02 +00:00
let stop_token = match inner.stop_source.as_ref() {
Some(ss) => ss.token(),
None => {
// Do nothing if we're shutting down
return Ok(());
}
};
(inner.next_oldest_ts, inner.timeout_task.clone(), stop_token)
2021-11-22 16:28:30 +00:00
};
2022-12-17 01:07:28 +00:00
let now = get_aligned_timestamp();
2021-11-22 16:28:30 +00:00
// If we have at least one timestamp to expire, lets do it
if let Some(next_oldest_ts) = next_oldest_ts {
if now >= next_oldest_ts {
// Single-spawn the timeout task routine
let _ = timeout_task
2022-06-13 00:58:02 +00:00
.single_spawn(self.clone().timeout_task_routine(now, stop_token))
2021-11-22 16:28:30 +00:00
.await;
}
}
Ok(())
}
pub async fn shutdown(&self) {
2022-06-15 18:05:04 +00:00
debug!("starting receipt manager shutdown");
2021-11-22 16:28:30 +00:00
let network_manager = self.network_manager();
2022-06-13 00:58:02 +00:00
// Stop all tasks
let timeout_task = {
let mut inner = self.inner.lock();
// Drop the stop
drop(inner.stop_source.take());
inner.timeout_task.clone()
};
// Wait for everything to stop
2022-06-15 18:05:04 +00:00
debug!("waiting for timeout task to stop");
2022-06-13 00:58:02 +00:00
if !timeout_task.join().await.is_ok() {
panic!("joining timeout task failed");
}
2021-11-22 16:28:30 +00:00
*self.inner.lock() = Self::new_inner(network_manager);
2022-06-15 18:05:04 +00:00
debug!("finished receipt manager shutdown");
2021-11-22 16:28:30 +00:00
}
pub fn record_receipt(
&self,
2022-05-28 20:11:50 +00:00
receipt: Receipt,
2022-12-17 01:07:28 +00:00
expiration: Timestamp,
2021-11-22 16:28:30 +00:00
expected_returns: u32,
callback: impl ReceiptCallback,
) {
2022-05-28 20:11:50 +00:00
let receipt_nonce = receipt.get_nonce();
2022-05-28 14:07:57 +00:00
log_rpc!(debug "== New Multiple Receipt ({}) {} ", expected_returns, receipt_nonce.encode());
let record = Arc::new(Mutex::new(ReceiptRecord::new(
2022-05-28 20:11:50 +00:00
receipt,
2021-11-22 16:28:30 +00:00
expiration,
expected_returns,
callback,
)));
let mut inner = self.inner.lock();
2022-05-28 14:07:57 +00:00
inner.records_by_nonce.insert(receipt_nonce, record);
2022-05-26 00:56:13 +00:00
Self::update_next_oldest_timestamp(&mut *inner);
2021-11-22 16:28:30 +00:00
}
pub fn record_single_shot_receipt(
&self,
2022-05-28 20:11:50 +00:00
receipt: Receipt,
2022-12-17 01:07:28 +00:00
expiration: Timestamp,
2021-11-22 16:28:30 +00:00
eventual: ReceiptSingleShotType,
) {
2022-05-28 20:11:50 +00:00
let receipt_nonce = receipt.get_nonce();
2022-05-28 14:07:57 +00:00
log_rpc!(debug "== New SingleShot Receipt {}", receipt_nonce.encode());
2022-05-26 00:56:13 +00:00
2022-05-28 14:07:57 +00:00
let record = Arc::new(Mutex::new(ReceiptRecord::new_single_shot(
2022-05-28 20:11:50 +00:00
receipt, expiration, eventual,
2021-11-22 16:28:30 +00:00
)));
let mut inner = self.inner.lock();
2022-05-28 14:07:57 +00:00
inner.records_by_nonce.insert(receipt_nonce, record);
2022-05-26 00:56:13 +00:00
Self::update_next_oldest_timestamp(&mut *inner);
2021-11-22 16:28:30 +00:00
}
fn update_next_oldest_timestamp(inner: &mut ReceiptManagerInner) {
// Update the next oldest timestamp
2022-12-17 01:07:28 +00:00
let mut new_next_oldest_ts: Option<Timestamp> = None;
2022-05-28 14:07:57 +00:00
for v in inner.records_by_nonce.values() {
2021-11-22 16:28:30 +00:00
let receipt_inner = v.lock();
if new_next_oldest_ts.is_none()
|| receipt_inner.expiration_ts < new_next_oldest_ts.unwrap()
{
// Mark the next oldest timestamp we would need to take action on as we go through everything
new_next_oldest_ts = Some(receipt_inner.expiration_ts);
}
}
inner.next_oldest_ts = new_next_oldest_ts;
}
2023-02-08 02:44:50 +00:00
pub async fn cancel_receipt(&self, nonce: &Nonce) -> EyreResult<()> {
2022-05-26 00:56:13 +00:00
log_rpc!(debug "== Cancel Receipt {}", nonce.encode());
2021-11-22 16:28:30 +00:00
// Remove the record
let record = {
let mut inner = self.inner.lock();
2022-05-28 14:07:57 +00:00
let record = match inner.records_by_nonce.remove(nonce) {
2021-11-22 16:28:30 +00:00
Some(r) => r,
None => {
2022-07-10 21:36:50 +00:00
bail!("receipt not recorded");
2021-11-22 16:28:30 +00:00
}
};
Self::update_next_oldest_timestamp(&mut *inner);
record
};
// Generate a cancelled callback
let callback_future = {
let mut record_mut = record.lock();
2021-11-27 17:44:21 +00:00
Self::perform_callback(ReceiptEvent::Cancelled, &mut record_mut)
2021-11-22 16:28:30 +00:00
};
// Issue the callback
if let Some(callback_future) = callback_future {
callback_future.await;
}
Ok(())
}
2022-05-28 14:07:57 +00:00
pub async fn handle_receipt(
&self,
2022-05-28 20:11:50 +00:00
receipt: Receipt,
2022-10-31 03:23:12 +00:00
receipt_returned: ReceiptReturned,
2022-07-20 13:39:38 +00:00
) -> NetworkResult<()> {
2022-05-28 20:11:50 +00:00
let receipt_nonce = receipt.get_nonce();
let extra_data = receipt.get_extra_data();
2022-05-28 14:07:57 +00:00
log_rpc!(debug "<<== RECEIPT {} <- {}{}",
receipt_nonce.encode(),
2022-10-31 03:23:12 +00:00
match receipt_returned {
ReceiptReturned::OutOfBand => "OutOfBand".to_owned(),
ReceiptReturned::InBand { ref inbound_noderef } => format!("InBand({})", inbound_noderef),
2022-11-02 19:36:01 +00:00
ReceiptReturned::Safety => "Safety".to_owned(),
2022-11-02 01:05:48 +00:00
ReceiptReturned::Private { ref private_route } => format!("Private({})", private_route),
2022-05-28 14:07:57 +00:00
},
if extra_data.is_empty() {
"".to_owned()
} else {
format!("[{} extra]", extra_data.len())
}
);
2022-05-26 00:56:13 +00:00
2021-11-22 16:28:30 +00:00
// Increment return count
2022-06-13 00:58:02 +00:00
let (callback_future, stop_token) = {
2021-11-22 16:28:30 +00:00
// Look up the receipt record from the nonce
let mut inner = self.inner.lock();
2022-06-13 00:58:02 +00:00
let stop_token = match inner.stop_source.as_ref() {
Some(ss) => ss.token(),
None => {
// If we're stopping do nothing here
2022-07-20 13:39:38 +00:00
return NetworkResult::value(());
2022-06-13 00:58:02 +00:00
}
};
2022-05-28 14:07:57 +00:00
let record = match inner.records_by_nonce.get(&receipt_nonce) {
2021-11-22 16:28:30 +00:00
Some(r) => r.clone(),
None => {
2022-07-20 13:39:38 +00:00
return NetworkResult::invalid_message("receipt not recorded");
2021-11-22 16:28:30 +00:00
}
};
// Generate the callback future
let mut record_mut = record.lock();
record_mut.returns_so_far += 1;
2022-05-28 14:07:57 +00:00
// Get the receipt event to return
2022-10-31 03:23:12 +00:00
let receipt_event = match receipt_returned {
ReceiptReturned::OutOfBand => ReceiptEvent::ReturnedOutOfBand,
2022-11-02 19:36:01 +00:00
ReceiptReturned::Safety => ReceiptEvent::ReturnedSafety,
2022-10-31 03:23:12 +00:00
ReceiptReturned::InBand {
ref inbound_noderef,
} => ReceiptEvent::ReturnedInBand {
inbound_noderef: inbound_noderef.clone(),
},
2022-11-02 01:05:48 +00:00
ReceiptReturned::Private { ref private_route } => ReceiptEvent::ReturnedPrivate {
private_route: private_route.clone(),
},
2022-05-28 14:07:57 +00:00
};
let callback_future = Self::perform_callback(receipt_event, &mut record_mut);
2021-11-22 16:28:30 +00:00
// Remove the record if we're done
if record_mut.returns_so_far == record_mut.expected_returns {
2022-05-28 14:07:57 +00:00
inner.records_by_nonce.remove(&receipt_nonce);
2021-11-22 16:28:30 +00:00
Self::update_next_oldest_timestamp(&mut *inner);
}
2022-06-13 00:58:02 +00:00
(callback_future, stop_token)
2021-11-22 16:28:30 +00:00
};
// Issue the callback
if let Some(callback_future) = callback_future {
2022-06-13 00:58:02 +00:00
let _ = callback_future.timeout_at(stop_token).await;
2021-11-22 16:28:30 +00:00
}
2022-07-20 13:39:38 +00:00
NetworkResult::value(())
2021-11-22 16:28:30 +00:00
}
}