receipt rework and discovery rework

This commit is contained in:
John Smith
2022-05-28 10:07:57 -04:00
parent d80a81e460
commit b6e568f664
23 changed files with 817 additions and 431 deletions

View File

@@ -8,7 +8,13 @@ use xx::*;
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum ReceiptEvent {
Returned(NodeRef),
ReturnedOutOfBand {
extra_data: Vec<u8>,
},
ReturnedInBand {
inbound_noderef: NodeRef,
extra_data: Vec<u8>,
},
Expired,
Cancelled,
}
@@ -108,29 +114,29 @@ impl fmt::Debug for ReceiptRecord {
}
impl ReceiptRecord {
pub fn from_receipt(
receipt: &Receipt,
pub fn new(
receipt_nonce: ReceiptNonce,
expiration_ts: u64,
expected_returns: u32,
receipt_callback: impl ReceiptCallback,
) -> Self {
Self {
expiration_ts,
nonce: receipt.get_nonce(),
nonce: receipt_nonce,
expected_returns,
returns_so_far: 0u32,
receipt_callback: ReceiptRecordCallbackType::Normal(Box::new(receipt_callback)),
}
}
pub fn from_single_shot_receipt(
receipt: &Receipt,
pub fn new_single_shot(
receipt_nonce: ReceiptNonce,
expiration_ts: u64,
eventual: ReceiptSingleShotType,
) -> Self {
Self {
expiration_ts,
nonce: receipt.get_nonce(),
nonce: receipt_nonce,
returns_so_far: 0u32,
expected_returns: 1u32,
receipt_callback: ReceiptRecordCallbackType::SingleShot(Some(eventual)),
@@ -167,7 +173,7 @@ impl PartialOrd for ReceiptRecordTimestampSort {
pub struct ReceiptManagerInner {
network_manager: NetworkManager,
receipts_by_nonce: BTreeMap<ReceiptNonce, Arc<Mutex<ReceiptRecord>>>,
records_by_nonce: BTreeMap<ReceiptNonce, Arc<Mutex<ReceiptRecord>>>,
next_oldest_ts: Option<u64>,
timeout_task: SingleFuture<()>,
}
@@ -181,7 +187,7 @@ impl ReceiptManager {
fn new_inner(network_manager: NetworkManager) -> ReceiptManagerInner {
ReceiptManagerInner {
network_manager,
receipts_by_nonce: BTreeMap::new(),
records_by_nonce: BTreeMap::new(),
next_oldest_ts: None,
timeout_task: SingleFuture::new(),
}
@@ -240,7 +246,7 @@ impl ReceiptManager {
{
let mut inner = self.inner.lock();
let mut expired_nonces = Vec::new();
for (k, v) in &inner.receipts_by_nonce {
for (k, v) in &inner.records_by_nonce {
let receipt_inner = v.lock();
if receipt_inner.expiration_ts <= now {
// Expire this receipt
@@ -257,10 +263,7 @@ impl ReceiptManager {
}
// Now remove the expired receipts
for e in expired_nonces {
let expired_record = inner
.receipts_by_nonce
.remove(&e)
.expect("key should exist");
let expired_record = inner.records_by_nonce.remove(&e).expect("key should exist");
expired_records.push(expired_record);
}
// Update the next oldest timestamp
@@ -305,37 +308,39 @@ impl ReceiptManager {
pub fn record_receipt(
&self,
receipt: Receipt,
receipt_nonce: ReceiptNonce,
expiration: u64,
expected_returns: u32,
callback: impl ReceiptCallback,
) {
log_rpc!(debug "== New Multiple Receipt ({}) {} ", expected_returns, receipt.get_nonce().encode());
let record = Arc::new(Mutex::new(ReceiptRecord::from_receipt(
&receipt,
log_rpc!(debug "== New Multiple Receipt ({}) {} ", expected_returns, receipt_nonce.encode());
let record = Arc::new(Mutex::new(ReceiptRecord::new(
receipt_nonce,
expiration,
expected_returns,
callback,
)));
let mut inner = self.inner.lock();
inner.receipts_by_nonce.insert(receipt.get_nonce(), record);
inner.records_by_nonce.insert(receipt_nonce, record);
Self::update_next_oldest_timestamp(&mut *inner);
}
pub fn record_single_shot_receipt(
&self,
receipt: Receipt,
receipt_nonce: ReceiptNonce,
expiration: u64,
eventual: ReceiptSingleShotType,
) {
log_rpc!(debug "== New SingleShot Receipt {}", receipt.get_nonce().encode());
log_rpc!(debug "== New SingleShot Receipt {}", receipt_nonce.encode());
let record = Arc::new(Mutex::new(ReceiptRecord::from_single_shot_receipt(
&receipt, expiration, eventual,
let record = Arc::new(Mutex::new(ReceiptRecord::new_single_shot(
receipt_nonce,
expiration,
eventual,
)));
let mut inner = self.inner.lock();
inner.receipts_by_nonce.insert(receipt.get_nonce(), record);
inner.records_by_nonce.insert(receipt_nonce, record);
Self::update_next_oldest_timestamp(&mut *inner);
}
@@ -343,7 +348,7 @@ impl ReceiptManager {
fn update_next_oldest_timestamp(inner: &mut ReceiptManagerInner) {
// Update the next oldest timestamp
let mut new_next_oldest_ts: Option<u64> = None;
for v in inner.receipts_by_nonce.values() {
for v in inner.records_by_nonce.values() {
let receipt_inner = v.lock();
if new_next_oldest_ts.is_none()
|| receipt_inner.expiration_ts < new_next_oldest_ts.unwrap()
@@ -362,7 +367,7 @@ impl ReceiptManager {
// Remove the record
let record = {
let mut inner = self.inner.lock();
let record = match inner.receipts_by_nonce.remove(nonce) {
let record = match inner.records_by_nonce.remove(nonce) {
Some(r) => r,
None => {
return Err("receipt not recorded".to_owned());
@@ -386,14 +391,31 @@ impl ReceiptManager {
Ok(())
}
pub async fn handle_receipt(&self, node_ref: NodeRef, receipt: Receipt) -> Result<(), String> {
log_rpc!(debug "<<== RECEIPT {} <- {}", receipt.get_nonce().encode(), node_ref);
pub async fn handle_receipt(
&self,
receipt_nonce: ReceiptNonce,
extra_data: Vec<u8>,
inbound_noderef: Option<NodeRef>,
) -> Result<(), String> {
log_rpc!(debug "<<== RECEIPT {} <- {}{}",
receipt_nonce.encode(),
if let Some(nr) = &inbound_noderef {
nr.to_string()
} else {
"DIRECT".to_owned()
},
if extra_data.is_empty() {
"".to_owned()
} else {
format!("[{} extra]", extra_data.len())
}
);
// Increment return count
let callback_future = {
// Look up the receipt record from the nonce
let mut inner = self.inner.lock();
let record = match inner.receipts_by_nonce.get(&receipt.get_nonce()) {
let record = match inner.records_by_nonce.get(&receipt_nonce) {
Some(r) => r.clone(),
None => {
return Err("receipt not recorded".to_owned());
@@ -402,12 +424,22 @@ impl ReceiptManager {
// Generate the callback future
let mut record_mut = record.lock();
record_mut.returns_so_far += 1;
let callback_future =
Self::perform_callback(ReceiptEvent::Returned(node_ref), &mut record_mut);
// Get the receipt event to return
let receipt_event = if let Some(inbound_noderef) = inbound_noderef {
ReceiptEvent::ReturnedInBand {
inbound_noderef,
extra_data,
}
} else {
ReceiptEvent::ReturnedOutOfBand { extra_data }
};
let callback_future = Self::perform_callback(receipt_event, &mut record_mut);
// Remove the record if we're done
if record_mut.returns_so_far == record_mut.expected_returns {
inner.receipts_by_nonce.remove(&receipt.get_nonce());
inner.records_by_nonce.remove(&receipt_nonce);
Self::update_next_oldest_timestamp(&mut *inner);
}