veilid/veilid-core/src/network_manager/mod.rs

1321 lines
49 KiB
Rust
Raw Normal View History

2022-11-27 02:37:23 +00:00
use crate::*;
2022-05-31 23:54:52 +00:00
#[cfg(not(target_arch = "wasm32"))]
mod native;
#[cfg(target_arch = "wasm32")]
mod wasm;
2023-07-02 04:17:04 +00:00
mod address_filter;
2023-07-04 04:24:55 +00:00
mod connection_handle;
2022-05-31 23:54:52 +00:00
mod connection_manager;
mod connection_table;
2023-07-04 04:24:55 +00:00
mod direct_boot;
2022-05-31 23:54:52 +00:00
mod network_connection;
2023-07-04 04:24:55 +00:00
mod send_data;
mod stats;
2022-07-22 17:05:28 +00:00
mod tasks;
2023-05-29 19:24:57 +00:00
mod types;
2022-05-31 23:54:52 +00:00
pub mod tests;
////////////////////////////////////////////////////////////////////////////////////////
2022-09-08 01:52:08 +00:00
pub use connection_manager::*;
2023-07-04 04:24:55 +00:00
pub use direct_boot::*;
2022-05-31 23:54:52 +00:00
pub use network_connection::*;
pub use send_data::*;
pub use stats::*;
2023-07-04 04:24:55 +00:00
pub use types::*;
2022-05-31 23:54:52 +00:00
////////////////////////////////////////////////////////////////////////////////////////
2023-07-02 04:17:04 +00:00
use address_filter::*;
2023-07-04 04:24:55 +00:00
use connection_handle::*;
2022-10-30 23:29:31 +00:00
use crypto::*;
2022-12-08 17:48:01 +00:00
use futures_util::stream::FuturesUnordered;
2022-03-20 14:52:03 +00:00
use hashlink::LruCache;
2021-11-22 16:28:30 +00:00
use intf::*;
2022-05-31 23:54:52 +00:00
#[cfg(not(target_arch = "wasm32"))]
use native::*;
2023-07-15 20:18:13 +00:00
#[cfg(not(target_arch = "wasm32"))]
pub use native::{LOCAL_NETWORK_CAPABILITIES, MAX_CAPABILITIES, PUBLIC_INTERNET_CAPABILITIES};
2021-11-22 16:28:30 +00:00
use receipt_manager::*;
use routing_table::*;
2022-04-17 17:28:39 +00:00
use rpc_processor::*;
2023-05-29 19:24:57 +00:00
use storage_manager::*;
2022-05-31 23:54:52 +00:00
#[cfg(target_arch = "wasm32")]
use wasm::*;
2023-07-15 20:18:13 +00:00
#[cfg(target_arch = "wasm32")]
pub use wasm::{LOCAL_NETWORK_CAPABILITIES, MAX_CAPABILITIES, PUBLIC_INTERNET_CAPABILITIES};
2021-11-22 16:28:30 +00:00
////////////////////////////////////////////////////////////////////////////////////////
pub const MAX_MESSAGE_SIZE: usize = MAX_ENVELOPE_SIZE;
2022-03-19 22:19:40 +00:00
pub const IPADDR_TABLE_SIZE: usize = 1024;
2023-07-04 04:24:55 +00:00
pub const IPADDR_MAX_INACTIVE_DURATION_US: TimestampDuration =
TimestampDuration::new(300_000_000u64); // 5 minutes
pub const NODE_CONTACT_METHOD_CACHE_SIZE: usize = 1024;
2022-08-27 02:52:08 +00:00
pub const PUBLIC_ADDRESS_CHANGE_DETECTION_COUNT: usize = 3;
pub const PUBLIC_ADDRESS_CHECK_CACHE_SIZE: usize = 8;
pub const PUBLIC_ADDRESS_CHECK_TASK_INTERVAL_SECS: u32 = 60;
2023-07-04 04:24:55 +00:00
pub const PUBLIC_ADDRESS_INCONSISTENCY_TIMEOUT_US: TimestampDuration =
TimestampDuration::new(300_000_000u64); // 5 minutes
pub const PUBLIC_ADDRESS_INCONSISTENCY_PUNISHMENT_TIMEOUT_US: TimestampDuration =
TimestampDuration::new(3600_000_000u64); // 60 minutes
2023-07-02 04:17:04 +00:00
pub const ADDRESS_FILTER_TASK_INTERVAL_SECS: u32 = 60;
2022-06-25 14:57:33 +00:00
pub const BOOT_MAGIC: &[u8; 4] = b"BOOT";
2022-07-22 17:05:28 +00:00
2021-12-22 03:20:55 +00:00
#[derive(Copy, Clone, Debug, Default)]
pub struct ProtocolConfig {
2022-08-02 01:06:31 +00:00
pub outbound: ProtocolTypeSet,
pub inbound: ProtocolTypeSet,
pub family_global: AddressTypeSet,
pub family_local: AddressTypeSet,
2021-12-24 01:34:52 +00:00
}
2021-11-22 16:28:30 +00:00
// Things we get when we start up and go away when we shut down
// Routing table is not in here because we want it to survive a network shutdown/startup restart
#[derive(Clone)]
struct NetworkComponents {
net: Network,
connection_manager: ConnectionManager,
2021-11-22 16:28:30 +00:00
rpc_processor: RPCProcessor,
receipt_manager: ReceiptManager,
}
2022-06-15 19:03:13 +00:00
#[derive(Debug)]
struct ClientWhitelistEntry {
2022-12-17 01:07:28 +00:00
last_seen_ts: Timestamp,
}
2022-04-16 15:18:54 +00:00
2022-10-13 02:53:40 +00:00
#[derive(Copy, Clone, Debug)]
pub enum SendDataKind {
Direct(ConnectionDescriptor),
Indirect,
Existing(ConnectionDescriptor),
}
2022-08-27 02:52:08 +00:00
/// Mechanism required to contact another node
#[derive(Clone, Debug)]
2022-10-13 02:53:40 +00:00
pub(crate) enum NodeContactMethod {
2022-08-27 02:52:08 +00:00
/// Node is not reachable by any means
Unreachable,
2022-10-13 02:53:40 +00:00
/// Connection should have already existed
Existing,
2022-08-27 02:52:08 +00:00
/// Contact the node directly
Direct(DialInfo),
2022-10-13 02:53:40 +00:00
/// Request via signal the node connect back directly (relay, target)
2022-08-27 02:52:08 +00:00
SignalReverse(NodeRef, NodeRef),
2023-02-22 23:29:07 +00:00
/// Request via signal the node negotiate a hole punch (relay, target)
2022-08-27 02:52:08 +00:00
SignalHolePunch(NodeRef, NodeRef),
/// Must use an inbound relay to reach the node
InboundRelay(NodeRef),
/// Must use outbound relay to reach the node
OutboundRelay(NodeRef),
2022-04-21 00:49:16 +00:00
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, Ord, PartialOrd, Hash)]
struct NodeContactMethodCacheKey {
2023-07-14 23:14:12 +00:00
own_node_info_ts: Timestamp,
target_node_info_ts: Timestamp,
target_node_ref_filter: Option<NodeRefFilter>,
2023-06-25 21:59:11 +00:00
target_node_ref_sequencing: Sequencing,
}
2022-04-21 00:49:16 +00:00
2022-08-09 00:42:27 +00:00
#[derive(Copy, Clone, Debug, PartialEq, Eq, Ord, PartialOrd, Hash)]
struct PublicAddressCheckCacheKey(ProtocolType, AddressType);
2021-11-22 16:28:30 +00:00
// The mutable state of the network manager
2022-03-19 22:19:40 +00:00
struct NetworkManagerInner {
stats: NetworkManagerStats,
2023-02-11 20:54:55 +00:00
client_whitelist: LruCache<TypedKey, ClientWhitelistEntry>,
node_contact_method_cache: LruCache<NodeContactMethodCacheKey, NodeContactMethod>,
2022-08-09 00:42:27 +00:00
public_address_check_cache:
2022-08-27 02:52:08 +00:00
BTreeMap<PublicAddressCheckCacheKey, LruCache<IpAddr, SocketAddress>>,
public_address_inconsistencies_table:
2022-12-17 02:55:03 +00:00
BTreeMap<PublicAddressCheckCacheKey, HashMap<IpAddr, Timestamp>>,
2022-03-19 22:19:40 +00:00
}
struct NetworkManagerUnlockedInner {
2022-10-10 02:07:15 +00:00
// Handles
config: VeilidConfig,
2023-05-29 19:24:57 +00:00
storage_manager: StorageManager,
2022-10-10 02:07:15 +00:00
protected_store: ProtectedStore,
table_store: TableStore,
2023-07-04 04:24:55 +00:00
#[cfg(feature = "unstable-blockstore")]
2022-10-10 02:07:15 +00:00
block_store: BlockStore,
crypto: Crypto,
2022-09-08 01:52:08 +00:00
// Accessors
routing_table: RwLock<Option<RoutingTable>>,
2023-07-15 23:32:53 +00:00
address_filter: RwLock<Option<AddressFilter>>,
2022-09-08 01:52:08 +00:00
components: RwLock<Option<NetworkComponents>>,
update_callback: RwLock<Option<UpdateCallback>>,
2022-03-19 22:19:40 +00:00
// Background processes
2022-07-10 21:36:50 +00:00
rolling_transfers_task: TickTask<EyreReport>,
2022-08-27 02:52:08 +00:00
public_address_check_task: TickTask<EyreReport>,
2023-07-02 04:17:04 +00:00
address_filter_task: TickTask<EyreReport>,
2023-06-24 01:12:48 +00:00
// Network Key
network_key: Option<SharedSecret>,
2021-11-22 16:28:30 +00:00
}
#[derive(Clone)]
pub struct NetworkManager {
inner: Arc<Mutex<NetworkManagerInner>>,
2022-03-19 22:19:40 +00:00
unlocked_inner: Arc<NetworkManagerUnlockedInner>,
2021-11-22 16:28:30 +00:00
}
impl NetworkManager {
fn new_inner() -> NetworkManagerInner {
NetworkManagerInner {
2022-03-19 22:19:40 +00:00
stats: NetworkManagerStats::default(),
client_whitelist: LruCache::new_unbounded(),
node_contact_method_cache: LruCache::new(NODE_CONTACT_METHOD_CACHE_SIZE),
2022-08-09 00:42:27 +00:00
public_address_check_cache: BTreeMap::new(),
2022-08-27 02:52:08 +00:00
public_address_inconsistencies_table: BTreeMap::new(),
2022-03-19 22:19:40 +00:00
}
}
2022-10-10 02:07:15 +00:00
fn new_unlocked_inner(
config: VeilidConfig,
2023-05-29 19:24:57 +00:00
storage_manager: StorageManager,
2022-10-10 02:07:15 +00:00
protected_store: ProtectedStore,
table_store: TableStore,
2023-07-04 04:24:55 +00:00
#[cfg(feature = "unstable-blockstore")] block_store: BlockStore,
2022-10-10 02:07:15 +00:00
crypto: Crypto,
2023-06-24 01:12:48 +00:00
network_key: Option<SharedSecret>,
2022-10-10 02:07:15 +00:00
) -> NetworkManagerUnlockedInner {
2022-03-19 22:19:40 +00:00
NetworkManagerUnlockedInner {
2023-07-02 04:17:04 +00:00
config: config.clone(),
2023-05-29 19:24:57 +00:00
storage_manager,
2022-10-10 02:07:15 +00:00
protected_store,
table_store,
2023-07-04 04:24:55 +00:00
#[cfg(feature = "unstable-blockstore")]
2022-10-10 02:07:15 +00:00
block_store,
crypto,
2023-07-15 23:32:53 +00:00
address_filter: RwLock::new(None),
2022-09-08 01:52:08 +00:00
routing_table: RwLock::new(None),
components: RwLock::new(None),
update_callback: RwLock::new(None),
2022-03-19 22:19:40 +00:00
rolling_transfers_task: TickTask::new(ROLLING_TRANSFERS_INTERVAL_SECS),
2022-08-27 02:52:08 +00:00
public_address_check_task: TickTask::new(PUBLIC_ADDRESS_CHECK_TASK_INTERVAL_SECS),
2023-07-02 04:17:04 +00:00
address_filter_task: TickTask::new(ADDRESS_FILTER_TASK_INTERVAL_SECS),
2023-06-24 01:12:48 +00:00
network_key,
}
2021-11-22 16:28:30 +00:00
}
2022-10-10 02:07:15 +00:00
pub fn new(
config: VeilidConfig,
2023-05-29 19:24:57 +00:00
storage_manager: StorageManager,
2022-10-10 02:07:15 +00:00
protected_store: ProtectedStore,
table_store: TableStore,
2023-07-04 04:24:55 +00:00
#[cfg(feature = "unstable-blockstore")] block_store: BlockStore,
2022-10-10 02:07:15 +00:00
crypto: Crypto,
) -> Self {
2023-06-24 01:12:48 +00:00
// Make the network key
let network_key = {
let c = config.get();
let network_key_password = if let Some(nkp) = c.network.network_key_password.clone() {
Some(nkp)
} else {
2023-07-04 04:24:55 +00:00
if c.network
.routing_table
.bootstrap
.contains(&"bootstrap.veilid.net".to_owned())
{
2023-06-24 01:12:48 +00:00
None
} else {
Some(c.network.routing_table.bootstrap.join(","))
}
};
let network_key = if let Some(network_key_password) = network_key_password {
2023-06-25 01:23:48 +00:00
if !network_key_password.is_empty() {
info!("Using network key");
2023-06-24 01:12:48 +00:00
2023-06-25 01:23:48 +00:00
let bcs = crypto.best();
// Yes the use of the salt this way is generally bad, but this just needs to be hashed
2023-07-04 04:24:55 +00:00
Some(
bcs.derive_shared_secret(
network_key_password.as_bytes(),
network_key_password.as_bytes(),
)
.expect("failed to derive network key"),
)
2023-06-25 01:23:48 +00:00
} else {
None
}
2023-06-24 01:12:48 +00:00
} else {
None
};
network_key
};
2022-03-19 22:19:40 +00:00
let this = Self {
2021-11-22 16:28:30 +00:00
inner: Arc::new(Mutex::new(Self::new_inner())),
2022-10-10 02:07:15 +00:00
unlocked_inner: Arc::new(Self::new_unlocked_inner(
config,
2023-05-29 19:24:57 +00:00
storage_manager,
2022-10-10 02:07:15 +00:00
protected_store,
table_store,
2023-07-04 04:24:55 +00:00
#[cfg(feature = "unstable-blockstore")]
2022-10-10 02:07:15 +00:00
block_store,
crypto,
2023-06-24 01:12:48 +00:00
network_key,
2022-10-10 02:07:15 +00:00
)),
2022-03-19 22:19:40 +00:00
};
2022-11-25 01:17:54 +00:00
2023-05-29 19:24:57 +00:00
this.setup_tasks();
2022-11-25 01:17:54 +00:00
2022-03-19 22:19:40 +00:00
this
2021-11-22 16:28:30 +00:00
}
pub fn config(&self) -> VeilidConfig {
2022-10-10 02:07:15 +00:00
self.unlocked_inner.config.clone()
}
pub fn with_config<F, R>(&self, f: F) -> R
where
F: FnOnce(&VeilidConfigInner) -> R,
{
f(&*self.unlocked_inner.config.get())
}
2023-05-29 19:24:57 +00:00
pub fn storage_manager(&self) -> StorageManager {
self.unlocked_inner.storage_manager.clone()
}
2022-10-10 02:07:15 +00:00
pub fn protected_store(&self) -> ProtectedStore {
self.unlocked_inner.protected_store.clone()
2021-11-22 16:28:30 +00:00
}
pub fn table_store(&self) -> TableStore {
2022-10-10 02:07:15 +00:00
self.unlocked_inner.table_store.clone()
}
2023-07-04 04:24:55 +00:00
#[cfg(feature = "unstable-blockstore")]
2022-10-10 02:07:15 +00:00
pub fn block_store(&self) -> BlockStore {
self.unlocked_inner.block_store.clone()
2021-11-22 16:28:30 +00:00
}
pub fn crypto(&self) -> Crypto {
2022-10-10 02:07:15 +00:00
self.unlocked_inner.crypto.clone()
2021-11-22 16:28:30 +00:00
}
2023-07-02 04:17:04 +00:00
pub fn address_filter(&self) -> AddressFilter {
2023-07-15 23:32:53 +00:00
self.unlocked_inner
.address_filter
.read()
.as_ref()
.unwrap()
.clone()
2023-07-02 04:17:04 +00:00
}
2021-11-22 16:28:30 +00:00
pub fn routing_table(&self) -> RoutingTable {
2022-09-08 01:52:08 +00:00
self.unlocked_inner
.routing_table
.read()
.as_ref()
.unwrap()
.clone()
2021-11-22 16:28:30 +00:00
}
pub fn net(&self) -> Network {
2022-09-08 01:52:08 +00:00
self.unlocked_inner
.components
.read()
.as_ref()
.unwrap()
.net
.clone()
2021-11-22 16:28:30 +00:00
}
pub fn rpc_processor(&self) -> RPCProcessor {
2022-09-08 01:52:08 +00:00
self.unlocked_inner
2021-11-22 16:28:30 +00:00
.components
2022-09-08 01:52:08 +00:00
.read()
2021-11-22 16:28:30 +00:00
.as_ref()
.unwrap()
.rpc_processor
.clone()
}
pub fn receipt_manager(&self) -> ReceiptManager {
2022-09-08 01:52:08 +00:00
self.unlocked_inner
2021-11-22 16:28:30 +00:00
.components
2022-09-08 01:52:08 +00:00
.read()
2021-11-22 16:28:30 +00:00
.as_ref()
.unwrap()
.receipt_manager
.clone()
}
pub fn connection_manager(&self) -> ConnectionManager {
2022-09-08 01:52:08 +00:00
self.unlocked_inner
2021-11-22 16:28:30 +00:00
.components
2022-09-08 01:52:08 +00:00
.read()
2021-11-22 16:28:30 +00:00
.as_ref()
.unwrap()
.connection_manager
2021-11-22 16:28:30 +00:00
.clone()
}
2022-11-25 19:21:55 +00:00
pub fn update_callback(&self) -> UpdateCallback {
self.unlocked_inner
.update_callback
.read()
.as_ref()
.unwrap()
.clone()
}
2021-11-22 16:28:30 +00:00
2022-06-10 21:07:10 +00:00
#[instrument(level = "debug", skip_all, err)]
2022-07-10 21:36:50 +00:00
pub async fn init(&self, update_callback: UpdateCallback) -> EyreResult<()> {
2021-11-22 16:28:30 +00:00
let routing_table = RoutingTable::new(self.clone());
routing_table.init().await?;
2023-07-15 23:32:53 +00:00
let address_filter = AddressFilter::new(self.config(), routing_table.clone());
2022-09-08 01:52:08 +00:00
*self.unlocked_inner.routing_table.write() = Some(routing_table.clone());
2023-07-15 23:32:53 +00:00
*self.unlocked_inner.address_filter.write() = Some(address_filter);
2022-09-08 01:52:08 +00:00
*self.unlocked_inner.update_callback.write() = Some(update_callback);
2021-11-22 16:28:30 +00:00
Ok(())
}
2022-06-10 21:07:10 +00:00
#[instrument(level = "debug", skip_all)]
2021-11-22 16:28:30 +00:00
pub async fn terminate(&self) {
2022-09-08 01:52:08 +00:00
let routing_table = self.unlocked_inner.routing_table.write().take();
2022-02-07 02:18:42 +00:00
if let Some(routing_table) = routing_table {
2021-11-22 16:28:30 +00:00
routing_table.terminate().await;
}
2022-09-08 01:52:08 +00:00
*self.unlocked_inner.update_callback.write() = None;
2021-11-22 16:28:30 +00:00
}
2022-06-10 21:07:10 +00:00
#[instrument(level = "debug", skip_all, err)]
2022-07-10 21:36:50 +00:00
pub async fn internal_startup(&self) -> EyreResult<()> {
2021-11-22 16:28:30 +00:00
trace!("NetworkManager::internal_startup begin");
2022-09-08 01:52:08 +00:00
if self.unlocked_inner.components.read().is_some() {
2021-11-22 16:28:30 +00:00
debug!("NetworkManager::internal_startup already started");
return Ok(());
}
// Create network components
let connection_manager = ConnectionManager::new(self.clone());
2022-07-06 18:11:44 +00:00
let net = Network::new(
self.clone(),
self.routing_table(),
connection_manager.clone(),
);
let rpc_processor = RPCProcessor::new(
self.clone(),
self.unlocked_inner
.update_callback
.read()
.as_ref()
.unwrap()
.clone(),
);
2021-11-22 16:28:30 +00:00
let receipt_manager = ReceiptManager::new(self.clone());
2022-09-08 01:52:08 +00:00
*self.unlocked_inner.components.write() = Some(NetworkComponents {
2021-11-22 16:28:30 +00:00
net: net.clone(),
connection_manager: connection_manager.clone(),
2021-11-22 16:28:30 +00:00
rpc_processor: rpc_processor.clone(),
receipt_manager: receipt_manager.clone(),
});
// Start network components
2022-06-13 00:58:02 +00:00
connection_manager.startup().await;
net.startup().await?;
2021-11-22 16:28:30 +00:00
rpc_processor.startup().await?;
receipt_manager.startup().await?;
trace!("NetworkManager::internal_startup end");
Ok(())
}
2022-06-10 21:07:10 +00:00
#[instrument(level = "debug", skip_all, err)]
2022-07-10 21:36:50 +00:00
pub async fn startup(&self) -> EyreResult<()> {
2021-11-22 16:28:30 +00:00
if let Err(e) = self.internal_startup().await {
self.shutdown().await;
return Err(e);
}
2022-05-16 15:52:48 +00:00
2022-07-22 17:05:28 +00:00
// Inform api clients that things have changed
2022-05-16 15:52:48 +00:00
self.send_network_update();
2021-11-22 16:28:30 +00:00
Ok(())
}
2022-06-10 21:07:10 +00:00
#[instrument(level = "debug", skip_all)]
2021-11-22 16:28:30 +00:00
pub async fn shutdown(&self) {
2022-06-15 18:05:04 +00:00
debug!("starting network manager shutdown");
2021-11-22 16:28:30 +00:00
2022-05-25 15:12:19 +00:00
// Cancel all tasks
2023-05-29 19:24:57 +00:00
self.cancel_tasks().await;
2022-05-25 15:12:19 +00:00
2021-11-22 16:28:30 +00:00
// Shutdown network components if they started up
2022-06-15 18:05:04 +00:00
debug!("shutting down network components");
2022-09-08 01:52:08 +00:00
let components = self.unlocked_inner.components.read().clone();
2021-11-22 16:28:30 +00:00
if let Some(components) = components {
2022-06-13 00:58:02 +00:00
components.net.shutdown().await;
2022-06-15 18:05:04 +00:00
components.rpc_processor.shutdown().await;
components.receipt_manager.shutdown().await;
2022-06-29 14:13:49 +00:00
components.connection_manager.shutdown().await;
2022-09-08 01:52:08 +00:00
*self.unlocked_inner.components.write() = None;
2021-11-22 16:28:30 +00:00
}
// reset the state
2022-06-15 18:05:04 +00:00
debug!("resetting network manager state");
2022-05-16 15:52:48 +00:00
{
2022-10-09 18:59:01 +00:00
*self.inner.lock() = NetworkManager::new_inner();
2022-05-16 15:52:48 +00:00
}
// send update
2022-09-08 01:52:08 +00:00
debug!("sending network state update to api clients");
2022-05-16 15:52:48 +00:00
self.send_network_update();
2021-11-22 16:28:30 +00:00
2022-06-15 18:05:04 +00:00
debug!("finished network manager shutdown");
2021-11-22 16:28:30 +00:00
}
2023-02-11 20:54:55 +00:00
pub fn update_client_whitelist(&self, client: TypedKey) {
let mut inner = self.inner.lock();
2023-07-04 04:24:55 +00:00
match inner.client_whitelist.entry(client, |_k, _v| {
2023-02-26 03:05:44 +00:00
// do nothing on LRU evict
}) {
hashlink::lru_cache::Entry::Occupied(mut entry) => {
2022-12-17 01:07:28 +00:00
entry.get_mut().last_seen_ts = get_aligned_timestamp()
}
hashlink::lru_cache::Entry::Vacant(entry) => {
entry.insert(ClientWhitelistEntry {
2022-12-17 01:07:28 +00:00
last_seen_ts: get_aligned_timestamp(),
});
}
}
}
2022-06-10 21:07:10 +00:00
#[instrument(level = "trace", skip(self), ret)]
2023-02-11 20:54:55 +00:00
pub fn check_client_whitelist(&self, client: TypedKey) -> bool {
let mut inner = self.inner.lock();
2023-07-04 04:24:55 +00:00
match inner.client_whitelist.entry(client, |_k, _v| {
2023-02-26 03:05:44 +00:00
// do nothing on LRU evict
}) {
hashlink::lru_cache::Entry::Occupied(mut entry) => {
2022-12-17 01:07:28 +00:00
entry.get_mut().last_seen_ts = get_aligned_timestamp();
true
}
hashlink::lru_cache::Entry::Vacant(_) => false,
}
}
pub fn purge_client_whitelist(&self) {
2022-10-10 02:07:15 +00:00
let timeout_ms = self.with_config(|c| c.network.client_whitelist_timeout_ms);
let mut inner = self.inner.lock();
2023-07-04 04:24:55 +00:00
let cutoff_timestamp =
get_aligned_timestamp() - TimestampDuration::new((timeout_ms as u64) * 1000u64);
// Remove clients from the whitelist that haven't been since since our whitelist timeout
while inner
.client_whitelist
.peek_lru()
.map(|v| v.1.last_seen_ts < cutoff_timestamp)
.unwrap_or_default()
{
2022-06-15 19:03:13 +00:00
let (k, v) = inner.client_whitelist.remove_lru().unwrap();
trace!(key=?k, value=?v, "purge_client_whitelist: remove_lru")
}
}
2022-07-22 17:05:28 +00:00
pub fn needs_restart(&self) -> bool {
let net = self.net();
net.needs_restart()
2022-06-10 21:07:10 +00:00
}
2023-07-04 04:24:55 +00:00
pub fn generate_node_status(&self, _routing_domain: RoutingDomain) -> NodeStatus {
NodeStatus {}
2022-09-03 17:57:25 +00:00
}
2022-10-19 01:53:45 +00:00
/// Generates a multi-shot/normal receipt
2022-06-10 21:07:10 +00:00
#[instrument(level = "trace", skip(self, extra_data, callback), err)]
2022-05-28 20:11:50 +00:00
pub fn generate_receipt<D: AsRef<[u8]>>(
2021-11-22 16:28:30 +00:00
&self,
expiration_us: u64,
expected_returns: u32,
2022-05-28 20:11:50 +00:00
extra_data: D,
2021-11-22 16:28:30 +00:00
callback: impl ReceiptCallback,
2022-07-10 21:36:50 +00:00
) -> EyreResult<Vec<u8>> {
2021-11-22 16:28:30 +00:00
let receipt_manager = self.receipt_manager();
2022-05-28 20:11:50 +00:00
let routing_table = self.routing_table();
2021-11-22 16:28:30 +00:00
2022-05-28 20:11:50 +00:00
// Generate receipt and serialized form to return
2023-02-11 20:54:55 +00:00
let vcrypto = self.crypto().best();
2023-07-04 04:24:55 +00:00
2023-02-11 20:54:55 +00:00
let nonce = vcrypto.random_nonce();
let node_id = routing_table.node_id(vcrypto.kind());
2023-03-03 15:55:31 +00:00
let node_id_secret = routing_table.node_id_secret_key(vcrypto.kind());
2023-07-04 04:24:55 +00:00
let receipt = Receipt::try_new(
best_envelope_version(),
node_id.kind,
nonce,
node_id.value,
extra_data,
)?;
2022-05-28 20:11:50 +00:00
let out = receipt
2023-02-11 20:54:55 +00:00
.to_signed_data(self.crypto(), &node_id_secret)
2022-07-10 21:36:50 +00:00
.wrap_err("failed to generate signed receipt")?;
2021-11-22 16:28:30 +00:00
// Record the receipt for later
2022-12-17 01:07:28 +00:00
let exp_ts = get_aligned_timestamp() + expiration_us;
2022-05-28 20:11:50 +00:00
receipt_manager.record_receipt(receipt, exp_ts, expected_returns, callback);
2021-11-22 16:28:30 +00:00
2022-05-28 20:11:50 +00:00
Ok(out)
2021-11-22 16:28:30 +00:00
}
2022-10-19 01:53:45 +00:00
/// Generates a single-shot/normal receipt
2022-06-10 21:07:10 +00:00
#[instrument(level = "trace", skip(self, extra_data), err)]
2022-05-28 20:11:50 +00:00
pub fn generate_single_shot_receipt<D: AsRef<[u8]>>(
2021-11-22 16:28:30 +00:00
&self,
expiration_us: u64,
2022-05-28 20:11:50 +00:00
extra_data: D,
2022-07-10 21:36:50 +00:00
) -> EyreResult<(Vec<u8>, EventualValueFuture<ReceiptEvent>)> {
2021-11-22 16:28:30 +00:00
let receipt_manager = self.receipt_manager();
2022-05-28 20:11:50 +00:00
let routing_table = self.routing_table();
2021-11-22 16:28:30 +00:00
2022-05-28 20:11:50 +00:00
// Generate receipt and serialized form to return
2023-02-11 20:54:55 +00:00
let vcrypto = self.crypto().best();
2023-07-04 04:24:55 +00:00
2023-02-11 20:54:55 +00:00
let nonce = vcrypto.random_nonce();
let node_id = routing_table.node_id(vcrypto.kind());
2023-03-03 15:55:31 +00:00
let node_id_secret = routing_table.node_id_secret_key(vcrypto.kind());
2023-07-04 04:24:55 +00:00
let receipt = Receipt::try_new(
best_envelope_version(),
node_id.kind,
nonce,
node_id.value,
extra_data,
)?;
2022-05-28 20:11:50 +00:00
let out = receipt
2023-02-11 20:54:55 +00:00
.to_signed_data(self.crypto(), &node_id_secret)
2022-07-10 21:36:50 +00:00
.wrap_err("failed to generate signed receipt")?;
2021-11-22 16:28:30 +00:00
// Record the receipt for later
2022-12-17 01:07:28 +00:00
let exp_ts = get_aligned_timestamp() + expiration_us;
let eventual = SingleShotEventual::new(Some(ReceiptEvent::Cancelled));
2021-11-22 16:28:30 +00:00
let instance = eventual.instance();
2022-05-28 20:11:50 +00:00
receipt_manager.record_single_shot_receipt(receipt, exp_ts, eventual);
2021-11-22 16:28:30 +00:00
2022-05-28 20:11:50 +00:00
Ok((out, instance))
2021-11-22 16:28:30 +00:00
}
2022-10-19 01:53:45 +00:00
/// Process a received out-of-band receipt
2022-07-20 13:39:38 +00:00
#[instrument(level = "trace", skip(self, receipt_data), ret)]
2022-04-17 23:10:10 +00:00
pub async fn handle_out_of_band_receipt<R: AsRef<[u8]>>(
2022-04-17 17:28:39 +00:00
&self,
receipt_data: R,
2022-07-20 13:39:38 +00:00
) -> NetworkResult<()> {
2022-04-17 17:28:39 +00:00
let receipt_manager = self.receipt_manager();
2023-02-11 20:54:55 +00:00
let receipt = match Receipt::from_signed_data(self.crypto(), receipt_data.as_ref()) {
2022-07-20 13:39:38 +00:00
Err(e) => {
return NetworkResult::invalid_message(e.to_string());
}
Ok(v) => v,
};
2022-04-17 17:28:39 +00:00
2022-10-31 03:23:12 +00:00
receipt_manager
.handle_receipt(receipt, ReceiptReturned::OutOfBand)
.await
2022-04-17 17:28:39 +00:00
}
2022-10-19 01:53:45 +00:00
/// Process a received in-band receipt
2022-07-20 13:39:38 +00:00
#[instrument(level = "trace", skip(self, receipt_data), ret)]
2022-05-28 20:11:50 +00:00
pub async fn handle_in_band_receipt<R: AsRef<[u8]>>(
2022-04-17 17:28:39 +00:00
&self,
2022-05-28 20:11:50 +00:00
receipt_data: R,
2022-10-31 03:23:12 +00:00
inbound_noderef: NodeRef,
) -> NetworkResult<()> {
let receipt_manager = self.receipt_manager();
2023-02-11 20:54:55 +00:00
let receipt = match Receipt::from_signed_data(self.crypto(), receipt_data.as_ref()) {
2022-10-31 03:23:12 +00:00
Err(e) => {
return NetworkResult::invalid_message(e.to_string());
}
Ok(v) => v,
};
receipt_manager
.handle_receipt(receipt, ReceiptReturned::InBand { inbound_noderef })
.await
}
2022-11-02 19:36:01 +00:00
/// Process a received safety receipt
#[instrument(level = "trace", skip(self, receipt_data), ret)]
pub async fn handle_safety_receipt<R: AsRef<[u8]>>(
&self,
receipt_data: R,
) -> NetworkResult<()> {
let receipt_manager = self.receipt_manager();
2023-02-11 20:54:55 +00:00
let receipt = match Receipt::from_signed_data(self.crypto(), receipt_data.as_ref()) {
2022-11-02 19:36:01 +00:00
Err(e) => {
return NetworkResult::invalid_message(e.to_string());
}
Ok(v) => v,
};
receipt_manager
.handle_receipt(receipt, ReceiptReturned::Safety)
.await
}
2022-10-31 03:23:12 +00:00
/// Process a received private receipt
#[instrument(level = "trace", skip(self, receipt_data), ret)]
pub async fn handle_private_receipt<R: AsRef<[u8]>>(
&self,
receipt_data: R,
2023-02-21 01:37:52 +00:00
private_route: PublicKey,
2022-07-20 13:39:38 +00:00
) -> NetworkResult<()> {
2021-11-22 16:28:30 +00:00
let receipt_manager = self.receipt_manager();
2022-04-17 17:28:39 +00:00
2023-02-11 20:54:55 +00:00
let receipt = match Receipt::from_signed_data(self.crypto(), receipt_data.as_ref()) {
2022-07-20 13:39:38 +00:00
Err(e) => {
return NetworkResult::invalid_message(e.to_string());
}
Ok(v) => v,
};
2022-05-28 20:11:50 +00:00
2022-05-28 14:07:57 +00:00
receipt_manager
2022-11-02 01:05:48 +00:00
.handle_receipt(receipt, ReceiptReturned::Private { private_route })
2022-05-28 14:07:57 +00:00
.await
2021-11-22 16:28:30 +00:00
}
2022-04-17 23:10:10 +00:00
// Process a received signal
2022-06-10 21:07:10 +00:00
#[instrument(level = "trace", skip(self), err)]
2023-07-16 04:21:19 +00:00
pub async fn handle_signal(
&self,
connection_descriptor: ConnectionDescriptor,
signal_info: SignalInfo,
) -> EyreResult<NetworkResult<()>> {
2022-04-17 23:10:10 +00:00
match signal_info {
2022-05-28 20:11:50 +00:00
SignalInfo::ReverseConnect { receipt, peer_info } => {
2022-04-17 23:10:10 +00:00
let routing_table = self.routing_table();
let rpc = self.rpc_processor();
// Add the peer info to our routing table
2023-02-10 02:01:04 +00:00
let peer_nr = match routing_table.register_node_with_peer_info(
2022-08-31 01:21:16 +00:00
RoutingDomain::PublicInternet,
2023-02-11 20:54:55 +00:00
peer_info,
2022-08-27 02:52:08 +00:00
false,
2022-07-20 13:39:38 +00:00
) {
2023-06-16 00:22:54 +00:00
Ok(nr) => nr,
Err(e) => {
2023-07-04 04:24:55 +00:00
return Ok(NetworkResult::invalid_message(format!(
"unable to register reverse connect peerinfo: {}",
e
)));
2022-07-20 13:39:38 +00:00
}
};
2022-04-17 23:10:10 +00:00
2023-07-16 04:21:19 +00:00
// Restrict reverse connection to same protocol as inbound signal
let peer_nr = peer_nr
.filtered_clone(NodeRefFilter::from(connection_descriptor.protocol_type()));
2022-04-17 23:10:10 +00:00
// Make a reverse connection to the peer and send the receipt to it
2022-09-04 18:17:28 +00:00
rpc.rpc_call_return_receipt(Destination::direct(peer_nr), receipt)
2022-04-17 23:10:10 +00:00
.await
2022-07-20 13:39:38 +00:00
.wrap_err("rpc failure")
2022-04-17 23:10:10 +00:00
}
2022-05-28 20:11:50 +00:00
SignalInfo::HolePunch { receipt, peer_info } => {
2022-04-17 23:10:10 +00:00
let routing_table = self.routing_table();
2022-05-28 14:07:57 +00:00
let rpc = self.rpc_processor();
2022-04-17 23:10:10 +00:00
// Add the peer info to our routing table
2023-02-10 02:01:04 +00:00
let mut peer_nr = match routing_table.register_node_with_peer_info(
2022-08-31 01:21:16 +00:00
RoutingDomain::PublicInternet,
2023-02-11 20:54:55 +00:00
peer_info,
2022-08-27 02:52:08 +00:00
false,
2022-07-20 13:39:38 +00:00
) {
2023-06-16 00:22:54 +00:00
Ok(nr) => nr,
Err(e) => {
2023-07-04 04:24:55 +00:00
return Ok(NetworkResult::invalid_message(format!(
"unable to register hole punch connect peerinfo: {}",
e
)));
2022-07-20 13:39:38 +00:00
}
};
2022-04-17 23:10:10 +00:00
// Get the udp direct dialinfo for the hole punch
2022-10-09 18:59:01 +00:00
let outbound_nrf = routing_table
2022-09-04 18:17:28 +00:00
.get_outbound_node_ref_filter(RoutingDomain::PublicInternet)
2022-08-02 01:06:31 +00:00
.with_protocol_type(ProtocolType::UDP);
2022-09-04 18:17:28 +00:00
peer_nr.set_filter(Some(outbound_nrf));
2022-04-25 00:16:13 +00:00
let hole_punch_dial_info_detail = peer_nr
2022-09-04 18:17:28 +00:00
.first_filtered_dial_info_detail()
2022-07-10 21:36:50 +00:00
.ok_or_else(|| eyre!("No hole punch capable dialinfo found for node"))?;
2022-04-17 23:10:10 +00:00
2022-05-28 14:07:57 +00:00
// Now that we picked a specific dialinfo, further restrict the noderef to the specific address type
2022-08-02 01:06:31 +00:00
let filter = peer_nr.take_filter().unwrap();
let filter =
filter.with_address_type(hole_punch_dial_info_detail.dial_info.address_type());
2022-05-28 14:07:57 +00:00
peer_nr.set_filter(Some(filter));
2022-04-17 23:10:10 +00:00
// Do our half of the hole punch by sending an empty packet
// Both sides will do this and then the receipt will get sent over the punched hole
2022-08-06 16:36:07 +00:00
let connection_descriptor = network_result_try!(
2022-07-20 13:39:38 +00:00
self.net()
.send_data_to_dial_info(
hole_punch_dial_info_detail.dial_info.clone(),
Vec::new(),
)
.await?
);
2022-04-17 23:10:10 +00:00
// XXX: do we need a delay here? or another hole punch packet?
2022-08-06 16:36:07 +00:00
// Set the hole punch as our 'last connection' to ensure we return the receipt over the direct hole punch
2022-12-17 01:07:28 +00:00
peer_nr.set_last_connection(connection_descriptor, get_aligned_timestamp());
2022-08-06 16:36:07 +00:00
2022-05-28 14:07:57 +00:00
// Return the receipt using the same dial info send the receipt to it
2022-09-04 18:17:28 +00:00
rpc.rpc_call_return_receipt(Destination::direct(peer_nr), receipt)
2022-04-17 23:10:10 +00:00
.await
2022-07-20 13:39:38 +00:00
.wrap_err("rpc failure")
2022-04-17 23:10:10 +00:00
}
}
}
2022-10-19 01:53:45 +00:00
/// Builds an envelope for sending over the network
2023-07-04 04:24:55 +00:00
#[cfg_attr(
feature = "verbose-tracing",
instrument(level = "trace", skip(self, body), err)
)]
2021-11-22 16:28:30 +00:00
fn build_envelope<B: AsRef<[u8]>>(
&self,
2023-02-11 20:54:55 +00:00
dest_node_id: TypedKey,
2021-11-22 16:28:30 +00:00
version: u8,
body: B,
2022-07-10 21:36:50 +00:00
) -> EyreResult<Vec<u8>> {
2021-11-22 16:28:30 +00:00
// DH to get encryption key
let routing_table = self.routing_table();
2023-02-11 20:54:55 +00:00
let Some(vcrypto) = self.crypto().get(dest_node_id.kind) else {
bail!("should not have a destination with incompatible crypto here");
};
let node_id = routing_table.node_id(vcrypto.kind());
2023-03-03 15:55:31 +00:00
let node_id_secret = routing_table.node_id_secret_key(vcrypto.kind());
2021-11-22 16:28:30 +00:00
// Get timestamp, nonce
2022-12-17 01:07:28 +00:00
let ts = get_aligned_timestamp();
2023-02-11 20:54:55 +00:00
let nonce = vcrypto.random_nonce();
2021-11-22 16:28:30 +00:00
// Encode envelope
2023-07-04 04:24:55 +00:00
let envelope = Envelope::new(
version,
node_id.kind,
ts,
nonce,
node_id.value,
dest_node_id.value,
);
2021-11-22 16:28:30 +00:00
envelope
2023-07-04 04:24:55 +00:00
.to_encrypted_data(
self.crypto(),
body.as_ref(),
&node_id_secret,
&self.unlocked_inner.network_key,
)
2022-07-10 21:36:50 +00:00
.wrap_err("envelope failed to encode")
2021-11-22 16:28:30 +00:00
}
2022-10-19 01:53:45 +00:00
/// Called by the RPC handler when we want to issue an RPC request or response
/// node_ref is the direct destination to which the envelope will be sent
2023-02-26 23:11:10 +00:00
/// If 'destination_node_ref' is specified, it can be different than the node_ref being sent to
2022-10-19 01:53:45 +00:00
/// which will cause the envelope to be relayed
2023-07-04 04:24:55 +00:00
#[cfg_attr(
feature = "verbose-tracing",
instrument(level = "trace", skip(self, body), ret, err)
)]
2021-11-22 16:28:30 +00:00
pub async fn send_envelope<B: AsRef<[u8]>>(
&self,
node_ref: NodeRef,
2023-02-26 23:11:10 +00:00
destination_node_ref: Option<NodeRef>,
2021-11-22 16:28:30 +00:00
body: B,
2022-07-20 13:39:38 +00:00
) -> EyreResult<NetworkResult<SendDataKind>> {
2023-02-26 23:11:10 +00:00
let destination_node_ref = destination_node_ref.as_ref().unwrap_or(&node_ref).clone();
let best_node_id = destination_node_ref.best_node_id();
2023-02-17 22:47:21 +00:00
2023-02-14 02:12:27 +00:00
// Get node's envelope versions and see if we can send to it
2021-11-22 16:28:30 +00:00
// and if so, get the max version we can use
2023-02-26 23:11:10 +00:00
let Some(envelope_version) = destination_node_ref.best_envelope_version() else {
2023-02-12 04:16:32 +00:00
bail!(
"can't talk to this node {} because we dont support its envelope versions",
node_ref
);
2021-11-22 16:28:30 +00:00
};
// Build the envelope to send
2023-06-25 01:23:48 +00:00
let out = self.build_envelope(best_node_id, envelope_version, body)?;
2021-11-22 16:28:30 +00:00
2023-07-13 18:16:14 +00:00
if !node_ref.same_entry(&destination_node_ref) {
log_net!(
"sending envelope to {:?} via {:?}, len={}",
destination_node_ref,
node_ref,
out.len()
);
} else {
log_net!("sending envelope to {:?}, len={}", node_ref, out.len());
}
2022-04-16 15:18:54 +00:00
// Send the envelope via whatever means necessary
2023-02-17 22:47:21 +00:00
self.send_data(node_ref, out).await
2021-11-22 16:28:30 +00:00
}
2022-10-19 01:53:45 +00:00
/// Called by the RPC handler when we want to issue an direct receipt
2023-06-25 18:09:22 +00:00
#[instrument(level = "debug", skip(self, rcpt_data), err)]
2022-05-28 20:11:50 +00:00
pub async fn send_out_of_band_receipt(
2021-11-22 16:28:30 +00:00
&self,
2022-01-04 19:25:32 +00:00
dial_info: DialInfo,
2023-06-25 01:23:48 +00:00
rcpt_data: Vec<u8>,
2022-07-10 21:36:50 +00:00
) -> EyreResult<()> {
2022-05-28 20:11:50 +00:00
// Do we need to validate the outgoing receipt? Probably not
// because it is supposed to be opaque and the
// recipient/originator does the validation
// Also, in the case of an old 'version', returning the receipt
// should not be subject to our ability to decode it
2021-11-22 16:28:30 +00:00
// Send receipt directly
2022-12-10 18:16:26 +00:00
network_result_value_or_log!(self
2022-07-14 20:57:34 +00:00
.net()
2022-05-28 14:07:57 +00:00
.send_data_unbound_to_dial_info(dial_info, rcpt_data)
2023-06-25 18:09:22 +00:00
.await? => [ format!(": dial_info={}, rcpt_data.len={}", dial_info, rcpt_data.len()) ] {
2022-07-20 13:39:38 +00:00
return Ok(());
2022-07-14 20:57:34 +00:00
}
2022-07-20 13:39:38 +00:00
);
2022-07-14 20:57:34 +00:00
Ok(())
2021-11-22 16:28:30 +00:00
}
// Called when a packet potentially containing an RPC envelope is received by a low-level
// network protocol handler. Processes the envelope, authenticates and decrypts the RPC message
// and passes it to the RPC handler
2023-06-25 02:59:51 +00:00
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", ret, err, skip(self, data), fields(data.len = data.len())))]
2022-06-05 00:18:26 +00:00
async fn on_recv_envelope(
2021-11-22 16:28:30 +00:00
&self,
2023-06-25 01:23:48 +00:00
data: &mut [u8],
2022-08-25 23:21:50 +00:00
connection_descriptor: ConnectionDescriptor,
2022-07-10 21:36:50 +00:00
) -> EyreResult<bool> {
2023-07-04 04:24:55 +00:00
#[cfg(feature = "verbose-tracing")]
2022-07-20 13:39:38 +00:00
let root = span!(
parent: None,
Level::TRACE,
"on_recv_envelope",
"data.len" = data.len(),
2022-08-25 23:21:50 +00:00
"descriptor" = ?connection_descriptor
2022-07-20 13:39:38 +00:00
);
2023-07-04 04:24:55 +00:00
#[cfg(feature = "verbose-tracing")]
2022-07-20 13:39:38 +00:00
let _root_enter = root.enter();
2022-01-05 17:01:02 +00:00
log_net!(
"envelope of {} bytes received from {:?}",
data.len(),
2022-08-25 23:21:50 +00:00
connection_descriptor
2022-01-05 17:01:02 +00:00
);
2023-07-02 04:17:04 +00:00
let remote_addr = connection_descriptor.remote_address().to_ip_addr();
2022-03-20 14:52:03 +00:00
// Network accounting
2023-07-04 04:24:55 +00:00
self.stats_packet_rcvd(remote_addr, ByteCount::new(data.len() as u64));
2022-03-20 14:52:03 +00:00
2022-07-06 18:12:28 +00:00
// If this is a zero length packet, just drop it, because these are used for hole punching
// and possibly other low-level network connectivity tasks and will never require
// more processing or forwarding
if data.len() == 0 {
return Ok(true);
}
2022-05-28 14:07:57 +00:00
// Ensure we can read the magic number
if data.len() < 4 {
2022-08-20 21:08:48 +00:00
log_net!(debug "short packet");
2023-07-15 23:32:53 +00:00
self.address_filter().punish_ip_addr(remote_addr);
2022-07-20 13:39:38 +00:00
return Ok(false);
2022-05-28 14:07:57 +00:00
}
2022-09-03 17:57:25 +00:00
// Get the routing domain for this data
let routing_domain = match self
.routing_table()
2022-09-04 18:17:28 +00:00
.routing_domain_for_address(connection_descriptor.remote_address().address())
2022-09-03 17:57:25 +00:00
{
Some(rd) => rd,
None => {
log_net!(debug "no routing domain for envelope received from {:?}", connection_descriptor);
return Ok(false);
}
};
2022-06-25 14:57:33 +00:00
// Is this a direct bootstrap request instead of an envelope?
if data[0..4] == *BOOT_MAGIC {
2023-06-25 18:09:22 +00:00
network_result_value_or_log!(self.handle_boot_request(connection_descriptor).await? => [ format!(": connection_descriptor={:?}", connection_descriptor) ] {});
2022-06-25 14:57:33 +00:00
return Ok(true);
}
2021-11-22 16:28:30 +00:00
// Is this an out-of-band receipt instead of an envelope?
2023-02-13 21:12:46 +00:00
if data[0..3] == *RECEIPT_MAGIC {
2023-06-25 18:09:22 +00:00
network_result_value_or_log!(self.handle_out_of_band_receipt(data).await => [ format!(": data.len={}", data.len()) ] {});
2021-11-22 16:28:30 +00:00
return Ok(true);
}
// Decode envelope header (may fail signature validation)
2023-07-04 04:24:55 +00:00
let envelope =
match Envelope::from_signed_data(self.crypto(), data, &self.unlocked_inner.network_key)
{
Ok(v) => v,
Err(e) => {
log_net!(debug "envelope failed to decode: {}", e);
2023-07-15 23:32:53 +00:00
self.address_filter().punish_ip_addr(remote_addr);
2023-07-04 04:24:55 +00:00
return Ok(false);
}
};
2021-11-22 16:28:30 +00:00
// Get timestamp range
2022-10-10 02:07:15 +00:00
let (tsbehind, tsahead) = self.with_config(|c| {
2021-11-22 16:28:30 +00:00
(
2023-07-04 04:24:55 +00:00
c.network
.rpc
.max_timestamp_behind_ms
.map(ms_to_us)
.map(TimestampDuration::new),
c.network
.rpc
.max_timestamp_ahead_ms
.map(ms_to_us)
.map(TimestampDuration::new),
2021-11-22 16:28:30 +00:00
)
2022-10-10 02:07:15 +00:00
});
2021-11-22 16:28:30 +00:00
// Validate timestamp isn't too old
2022-12-17 01:07:28 +00:00
let ts = get_aligned_timestamp();
2021-11-22 16:28:30 +00:00
let ets = envelope.get_timestamp();
if let Some(tsbehind) = tsbehind {
2022-12-17 01:07:28 +00:00
if tsbehind.as_u64() != 0 && (ts > ets && ts.saturating_sub(ets) > tsbehind) {
2022-08-20 21:08:48 +00:00
log_net!(debug
2023-06-25 00:23:33 +00:00
"Timestamp behind: {}ms ({})",
timestamp_to_secs(ts.saturating_sub(ets).as_u64()) * 1000f64,
connection_descriptor.remote()
2022-07-10 21:36:50 +00:00
);
2022-08-20 21:08:48 +00:00
return Ok(false);
2021-11-22 16:28:30 +00:00
}
}
if let Some(tsahead) = tsahead {
2022-12-17 01:07:28 +00:00
if tsahead.as_u64() != 0 && (ts < ets && ets.saturating_sub(ts) > tsahead) {
2022-08-20 21:08:48 +00:00
log_net!(debug
2023-06-25 00:23:33 +00:00
"Timestamp ahead: {}ms ({})",
timestamp_to_secs(ets.saturating_sub(ts).as_u64()) * 1000f64,
connection_descriptor.remote()
2022-07-10 21:36:50 +00:00
);
2022-08-20 21:08:48 +00:00
return Ok(false);
2021-11-22 16:28:30 +00:00
}
}
2022-09-08 01:52:08 +00:00
// Get routing table and rpc processor
let routing_table = self.routing_table();
let rpc = self.rpc_processor();
// Peek at header and see if we need to relay this
// If the recipient id is not our node id, then it needs relaying
2023-02-10 02:01:04 +00:00
let sender_id = TypedKey::new(envelope.get_crypto_kind(), envelope.get_sender_id());
2023-07-15 23:32:53 +00:00
if self.address_filter().is_node_id_punished(sender_id) {
return Ok(false);
}
2023-02-10 02:01:04 +00:00
let recipient_id = TypedKey::new(envelope.get_crypto_kind(), envelope.get_recipient_id());
2023-02-11 20:54:55 +00:00
if !routing_table.matches_own_node_id(&[recipient_id]) {
// See if the source node is allowed to resolve nodes
// This is a costly operation, so only outbound-relay permitted
// nodes are allowed to do this, for example PWA users
2022-07-20 13:39:38 +00:00
let some_relay_nr = if self.check_client_whitelist(sender_id) {
2022-04-16 15:18:54 +00:00
// Full relay allowed, do a full resolve_node
2023-07-04 04:24:55 +00:00
match rpc
.resolve_node(recipient_id, SafetySelection::Unsafe(Sequencing::default()))
.await
{
2022-12-17 18:02:39 +00:00
Ok(v) => v,
Err(e) => {
log_net!(debug "failed to resolve recipient node for relay, dropping outbound relayed packet: {}" ,e);
return Ok(false);
}
}
} else {
// If this is not a node in the client whitelist, only allow inbound relay
// which only performs a lightweight lookup before passing the packet back out
// See if we have the node in our routing table
// We should, because relays are chosen by nodes that have established connectivity and
// should be mutually in each others routing tables. The node needing the relay will be
// pinging this node regularly to keep itself in the routing table
2023-06-16 00:22:54 +00:00
match routing_table.lookup_node_ref(recipient_id) {
Ok(v) => v,
Err(e) => {
log_net!(debug "failed to look up recipient node for relay, dropping outbound relayed packet: {}" ,e);
return Ok(false);
}
}
};
2022-07-20 13:39:38 +00:00
if let Some(relay_nr) = some_relay_nr {
2023-07-20 00:55:37 +00:00
// Ensure the protocol used to forward is of the same sequencing requirement
2023-07-16 04:21:19 +00:00
// Address type is allowed to change if connectivity is better
2023-07-20 00:55:37 +00:00
let relay_nr = if connection_descriptor.protocol_type().is_ordered() {
// XXX: this is a little redundant
let (_, nrf) = NodeRefFilter::new().with_sequencing(Sequencing::EnsureOrdered);
let mut relay_nr = relay_nr.filtered_clone(nrf);
relay_nr.set_sequencing(Sequencing::EnsureOrdered);
relay_nr
} else {
relay_nr
};
2023-06-25 05:23:24 +00:00
2022-07-20 13:39:38 +00:00
// Relay the packet to the desired destination
log_net!("relaying {} bytes to {}", data.len(), relay_nr);
2023-06-24 01:12:48 +00:00
2022-12-17 18:02:39 +00:00
network_result_value_or_log!(match self.send_data(relay_nr, data.to_vec())
.await {
Ok(v) => v,
Err(e) => {
log_net!(debug "failed to forward envelope: {}" ,e);
2023-07-04 04:24:55 +00:00
return Ok(false);
2022-12-17 18:02:39 +00:00
}
2023-06-25 18:09:22 +00:00
} => [ format!(": relay_nr={}, data.len={}", relay_nr, data.len()) ] {
2022-07-20 13:39:38 +00:00
return Ok(false);
}
);
}
// Inform caller that we dealt with the envelope, but did not process it locally
return Ok(false);
}
// DH to get decryption key (cached)
2023-03-03 15:55:31 +00:00
let node_id_secret = routing_table.node_id_secret_key(envelope.get_crypto_kind());
// Decrypt the envelope body
2023-07-04 04:24:55 +00:00
let body = match envelope.decrypt_body(
self.crypto(),
data,
&node_id_secret,
&self.unlocked_inner.network_key,
) {
Ok(v) => v,
Err(e) => {
log_net!(debug "failed to decrypt envelope body: {}",e);
2023-07-15 23:32:53 +00:00
self.address_filter().punish_ip_addr(remote_addr);
2023-07-04 04:24:55 +00:00
return Ok(false);
}
};
2021-11-22 16:28:30 +00:00
// Cache the envelope information in the routing table
2022-07-20 13:39:38 +00:00
let source_noderef = match routing_table.register_node_with_existing_connection(
2023-02-12 04:16:32 +00:00
TypedKey::new(envelope.get_crypto_kind(), envelope.get_sender_id()),
2022-08-25 23:21:50 +00:00
connection_descriptor,
2022-07-10 21:36:50 +00:00
ts,
2022-07-20 13:39:38 +00:00
) {
2023-06-16 00:22:54 +00:00
Ok(v) => v,
Err(e) => {
2022-07-20 13:39:38 +00:00
// If the node couldn't be registered just skip this envelope,
2023-06-16 00:22:54 +00:00
log_net!(debug "failed to register node with existing connection: {}", e);
2022-07-20 13:39:38 +00:00
return Ok(false);
}
};
2023-02-11 20:54:55 +00:00
source_noderef.add_envelope_version(envelope.get_version());
2021-11-22 16:28:30 +00:00
// Pass message to RPC system
2022-10-30 02:15:50 +00:00
rpc.enqueue_direct_message(
2022-09-03 17:57:25 +00:00
envelope,
source_noderef,
connection_descriptor,
routing_domain,
2022-10-30 02:15:50 +00:00
body,
2022-09-03 17:57:25 +00:00
)?;
2021-11-22 16:28:30 +00:00
// Inform caller that we dealt with the envelope locally
Ok(true)
}
2022-03-19 22:19:40 +00:00
2022-04-23 01:30:09 +00:00
// Determine if a local IP address has changed
// this means we should restart the low level network and and recreate all of our dial info
// Wait until we have received confirmation from N different peers
2022-09-03 17:57:25 +00:00
pub fn report_local_network_socket_address(
2022-04-23 01:30:09 +00:00
&self,
_socket_address: SocketAddress,
2022-08-09 00:42:27 +00:00
_connection_descriptor: ConnectionDescriptor,
2022-04-23 01:30:09 +00:00
_reporting_peer: NodeRef,
) {
// XXX: Nothing here yet.
}
// Determine if a global IP address has changed
// this means we should recreate our public dial info if it is not static and rediscover it
// Wait until we have received confirmation from N different peers
2022-09-03 17:57:25 +00:00
pub fn report_public_internet_socket_address(
2022-04-23 01:30:09 +00:00
&self,
2022-08-09 00:42:27 +00:00
socket_address: SocketAddress, // the socket address as seen by the remote peer
connection_descriptor: ConnectionDescriptor, // the connection descriptor used
reporting_peer: NodeRef, // the peer's noderef reporting the socket address
2022-04-23 01:30:09 +00:00
) {
2023-06-25 22:28:32 +00:00
#[cfg(feature = "verbose-tracing")]
debug!("report_global_socket_address\nsocket_address: {:#?}\nconnection_descriptor: {:#?}\nreporting_peer: {:#?}", socket_address, connection_descriptor, reporting_peer);
2022-08-20 21:08:48 +00:00
2022-08-28 17:13:09 +00:00
// Ignore these reports if we are currently detecting public dial info
2022-09-08 01:52:08 +00:00
let net = self.net();
2022-11-04 23:29:44 +00:00
if net.needs_public_dial_info_check() {
2022-08-28 17:13:09 +00:00
return;
}
2022-09-08 01:52:08 +00:00
let routing_table = self.routing_table();
2022-10-10 02:07:15 +00:00
let (detect_address_changes, ip6_prefix_size) = self.with_config(|c| {
(
c.network.detect_address_changes,
c.network.max_connections_per_ip6_prefix_size as usize,
)
});
2022-08-28 17:13:09 +00:00
// Get the ip(block) this report is coming from
let ipblock = ip_to_ipblock(
ip6_prefix_size,
connection_descriptor.remote_address().to_ip_addr(),
);
// Store the reported address if it isn't denylisted
2022-08-09 00:42:27 +00:00
let key = PublicAddressCheckCacheKey(
connection_descriptor.protocol_type(),
connection_descriptor.address_type(),
);
2022-09-08 01:52:08 +00:00
let mut inner = self.inner.lock();
let inner = &mut *inner;
2022-08-28 17:13:09 +00:00
let pacc = inner
.public_address_check_cache
.entry(key)
.or_insert_with(|| LruCache::new(PUBLIC_ADDRESS_CHECK_CACHE_SIZE));
let pait = inner
.public_address_inconsistencies_table
.entry(key)
.or_insert_with(|| HashMap::new());
if pait.contains_key(&ipblock) {
return;
}
2023-07-04 04:24:55 +00:00
pacc.insert(ipblock, socket_address, |_k, _v| {
2023-02-26 03:05:44 +00:00
// do nothing on LRU evict
});
2022-04-23 01:30:09 +00:00
// Determine if our external address has likely changed
2022-08-27 16:54:09 +00:00
let mut bad_public_address_detection_punishment: Option<
Box<dyn FnOnce() + Send + 'static>,
> = None;
2022-10-09 18:59:01 +00:00
let public_internet_network_class = routing_table
2022-08-31 01:21:16 +00:00
.get_network_class(RoutingDomain::PublicInternet)
.unwrap_or(NetworkClass::Invalid);
2022-08-22 17:27:26 +00:00
let needs_public_address_detection =
2022-08-31 01:21:16 +00:00
if matches!(public_internet_network_class, NetworkClass::InboundCapable) {
2022-08-22 17:27:26 +00:00
// Get the dial info filter for this connection so we can check if we have any public dialinfo that may have changed
let dial_info_filter = connection_descriptor.make_dial_info_filter();
// Get current external ip/port from registered global dialinfo
let current_addresses: BTreeSet<SocketAddress> = routing_table
.all_filtered_dial_info_details(
2022-09-04 18:17:28 +00:00
RoutingDomain::PublicInternet.into(),
2022-08-22 17:27:26 +00:00
&dial_info_filter,
)
.iter()
.map(|did| did.dial_info.socket_address())
.collect();
// If we are inbound capable, but start to see inconsistent socket addresses from multiple reporting peers
// then we zap the network class and re-detect it
2022-08-27 02:52:08 +00:00
let mut inconsistencies = Vec::new();
2022-08-28 17:13:09 +00:00
2022-08-22 17:27:26 +00:00
// Iteration goes from most recent to least recent node/address pair
2022-08-27 02:52:08 +00:00
for (reporting_ip_block, a) in pacc {
// If this address is not one of our current addresses (inconsistent)
// and we haven't already denylisted the reporting source,
if !current_addresses.contains(a) && !pait.contains_key(reporting_ip_block) {
// Record the origin of the inconsistency
inconsistencies.push(*reporting_ip_block);
2022-08-27 16:54:09 +00:00
}
}
2022-08-27 02:52:08 +00:00
2022-08-27 16:54:09 +00:00
// If we have enough inconsistencies to consider changing our public dial info,
// add them to our denylist (throttling) and go ahead and check for new
// public dialinfo
let inconsistent = if inconsistencies.len() >= PUBLIC_ADDRESS_CHANGE_DETECTION_COUNT
{
2022-12-17 01:07:28 +00:00
let exp_ts = get_aligned_timestamp() + PUBLIC_ADDRESS_INCONSISTENCY_TIMEOUT_US;
2022-08-27 16:54:09 +00:00
for i in &inconsistencies {
pait.insert(*i, exp_ts);
}
2022-08-27 02:52:08 +00:00
2022-08-27 16:54:09 +00:00
// Run this routine if the inconsistent nodes turn out to be lying
let this = self.clone();
bad_public_address_detection_punishment = Some(Box::new(move || {
let mut inner = this.inner.lock();
let pait = inner
.public_address_inconsistencies_table
.entry(key)
.or_insert_with(|| HashMap::new());
2023-07-04 04:24:55 +00:00
let exp_ts = get_aligned_timestamp()
+ PUBLIC_ADDRESS_INCONSISTENCY_PUNISHMENT_TIMEOUT_US;
2022-08-27 16:54:09 +00:00
for i in inconsistencies {
pait.insert(i, exp_ts);
2022-08-22 17:27:26 +00:00
}
2022-08-27 16:54:09 +00:00
}));
true
} else {
false
};
2022-08-22 17:27:26 +00:00
// // debug code
2022-08-27 16:54:09 +00:00
// if inconsistent {
2022-08-22 17:27:26 +00:00
// trace!("public_address_check_cache: {:#?}\ncurrent_addresses: {:#?}\ninconsistencies: {}", inner
// .public_address_check_cache, current_addresses, inconsistencies);
// }
2022-08-20 21:08:48 +00:00
2022-08-27 02:52:08 +00:00
inconsistent
2022-12-16 01:52:24 +00:00
} else if matches!(public_internet_network_class, NetworkClass::OutboundOnly) {
2022-08-22 17:27:26 +00:00
// If we are currently outbound only, we don't have any public dial info
// but if we are starting to see consistent socket address from multiple reporting peers
// then we may be become inbound capable, so zap the network class so we can re-detect it and any public dial info
2022-08-20 21:08:48 +00:00
2022-08-22 17:27:26 +00:00
let mut consistencies = 0;
let mut consistent = false;
let mut current_address = Option::<SocketAddress>::None;
// Iteration goes from most recent to least recent node/address pair
let pacc = inner
.public_address_check_cache
.entry(key)
2022-08-27 02:52:08 +00:00
.or_insert_with(|| LruCache::new(PUBLIC_ADDRESS_CHECK_CACHE_SIZE));
2022-08-22 17:27:26 +00:00
for (_, a) in pacc {
if let Some(current_address) = current_address {
if current_address == *a {
consistencies += 1;
2022-08-27 02:52:08 +00:00
if consistencies >= PUBLIC_ADDRESS_CHANGE_DETECTION_COUNT {
2022-08-22 17:27:26 +00:00
consistent = true;
break;
}
}
2022-08-22 17:27:26 +00:00
} else {
current_address = Some(*a);
}
}
2022-08-22 17:27:26 +00:00
consistent
2022-12-16 01:52:24 +00:00
} else {
// If we are a webapp we never do this.
// If we have invalid network class, then public address detection is already going to happen via the network_class_discovery task
false
2022-08-22 17:27:26 +00:00
};
if needs_public_address_detection {
2022-08-07 18:55:48 +00:00
if detect_address_changes {
// Reset the address check cache now so we can start detecting fresh
info!("Public address has changed, detecting public dial info");
2022-08-07 00:55:24 +00:00
2022-08-07 18:55:48 +00:00
inner.public_address_check_cache.clear();
2022-04-23 01:30:09 +00:00
2022-08-27 16:54:09 +00:00
// Re-detect the public dialinfo
net.set_needs_public_dial_info_check(bad_public_address_detection_punishment);
2022-08-07 18:55:48 +00:00
} else {
warn!("Public address may have changed. Restarting the server may be required.");
2022-08-25 23:29:39 +00:00
warn!("report_global_socket_address\nsocket_address: {:#?}\nconnection_descriptor: {:#?}\nreporting_peer: {:#?}", socket_address, connection_descriptor, reporting_peer);
warn!(
2022-08-25 23:21:50 +00:00
"public_address_check_cache: {:#?}",
inner.public_address_check_cache
);
2022-08-07 18:55:48 +00:00
}
2022-04-23 01:30:09 +00:00
}
}
2021-11-22 16:28:30 +00:00
}