refactor and make tcp work

This commit is contained in:
John Smith 2022-01-05 12:01:02 -05:00
parent 3035bc079f
commit b66aca0ce0
25 changed files with 339 additions and 447 deletions

1
Cargo.lock generated
View File

@ -3743,6 +3743,7 @@ version = "0.1.0"
dependencies = [
"android_logger",
"anyhow",
"async-channel",
"async-lock",
"async-std",
"async-tls",

View File

@ -87,6 +87,7 @@ getrandom = { version = "^0", features = ["js"] }
ws_stream_wasm = "^0"
async_executors = { version = "^0", default-features = false, features = [ "bindgen" ]}
async-lock = "^2"
async-channel = { version = "^1" }
# Configuration for WASM32 'web-sys' crate
[target.'cfg(target_arch = "wasm32")'.dependencies.web-sys]

View File

@ -15,7 +15,7 @@ const CONNECTION_PROCESSOR_CHANNEL_SIZE: usize = 128usize;
struct ConnectionManagerInner {
connection_table: ConnectionTable,
connection_processor_jh: Option<JoinHandle<()>>,
connection_add_channel_tx: Option<utils::channel::Sender<SystemPinBoxFuture<()>>>,
connection_add_channel_tx: Option<async_channel::Sender<SystemPinBoxFuture<()>>>,
}
impl core::fmt::Debug for ConnectionManagerInner {
@ -69,7 +69,7 @@ impl ConnectionManager {
pub async fn startup(&self) {
let mut inner = self.arc.inner.lock().await;
let cac = utils::channel::channel(CONNECTION_PROCESSOR_CHANNEL_SIZE); // xxx move to config
let cac = async_channel::bounded(CONNECTION_PROCESSOR_CHANNEL_SIZE); // xxx move to config
inner.connection_add_channel_tx = Some(cac.0);
let rx = cac.1.clone();
let this = self.clone();
@ -90,7 +90,7 @@ impl ConnectionManager {
}
// Internal routine to register new connection atomically
async fn on_new_connection_internal(
fn on_new_connection_internal(
&self,
inner: &mut ConnectionManagerInner,
conn: NetworkConnection,
@ -103,7 +103,6 @@ impl ConnectionManager {
let receiver_loop_future = Self::process_connection(self.clone(), conn.clone());
tx.try_send(receiver_loop_future)
.await
.map_err(map_to_string)
.map_err(logthru_net!(error "failed to start receiver loop"))?;
@ -117,7 +116,7 @@ impl ConnectionManager {
// and spawns a message processing loop for the connection
pub async fn on_new_connection(&self, conn: NetworkConnection) -> Result<(), String> {
let mut inner = self.arc.inner.lock().await;
self.on_new_connection_internal(&mut *inner, conn).await
self.on_new_connection_internal(&mut *inner, conn)
}
pub async fn get_or_create_connection(
@ -133,18 +132,21 @@ impl ConnectionManager {
None => ConnectionDescriptor::new_no_local(peer_address),
};
// If connection exists, then return it
// If any connection to this remote exists that has the same protocol, return it
// Any connection will do, we don't have to match the local address
let mut inner = self.arc.inner.lock().await;
if let Some(conn) = inner.connection_table.get_connection(descriptor) {
if let Some(conn) = inner
.connection_table
.get_last_connection_by_remote(descriptor.remote)
{
return Ok(conn);
}
// If not, attempt new connection
let conn = NetworkConnection::connect(local_addr, dial_info).await?;
self.on_new_connection_internal(&mut *inner, conn.clone())
.await?;
self.on_new_connection_internal(&mut *inner, conn.clone())?;
Ok(conn)
}
@ -154,6 +156,7 @@ impl ConnectionManager {
this: ConnectionManager,
conn: NetworkConnection,
) -> SystemPinBoxFuture<()> {
log_net!("Starting process_connection loop for {:?}", conn);
let network_manager = this.network_manager();
Box::pin(async move {
//
@ -162,7 +165,10 @@ impl ConnectionManager {
let res = conn.clone().recv().await;
let message = match res {
Ok(v) => v,
Err(_) => break,
Err(e) => {
log_net!(error e);
break;
}
};
if let Err(e) = network_manager
.on_recv_envelope(message.as_slice(), descriptor)
@ -189,7 +195,7 @@ impl ConnectionManager {
// Process connection oriented sockets in the background
// This never terminates and must have its task cancelled once started
// Task cancellation is performed by shutdown() by dropping the join handle
async fn connection_processor(self, rx: utils::channel::Receiver<SystemPinBoxFuture<()>>) {
async fn connection_processor(self, rx: async_channel::Receiver<SystemPinBoxFuture<()>>) {
let mut connection_futures: FuturesUnordered<SystemPinBoxFuture<()>> =
FuturesUnordered::new();
loop {

View File

@ -1,16 +1,19 @@
use crate::network_connection::*;
use crate::xx::*;
use crate::*;
use alloc::collections::btree_map::Entry;
#[derive(Debug)]
pub struct ConnectionTable {
conn_by_addr: BTreeMap<ConnectionDescriptor, NetworkConnection>,
conn_by_descriptor: BTreeMap<ConnectionDescriptor, NetworkConnection>,
conns_by_remote: BTreeMap<PeerAddress, Vec<NetworkConnection>>,
}
impl ConnectionTable {
pub fn new() -> Self {
Self {
conn_by_addr: BTreeMap::new(),
conn_by_descriptor: BTreeMap::new(),
conns_by_remote: BTreeMap::new(),
}
}
@ -21,31 +24,71 @@ impl ConnectionTable {
ProtocolType::UDP,
"Only connection oriented protocols go in the table!"
);
if self.conn_by_addr.contains_key(&descriptor) {
if self.conn_by_descriptor.contains_key(&descriptor) {
return Err(format!(
"Connection already added to table: {:?}",
descriptor
));
}
let res = self.conn_by_addr.insert(descriptor, conn);
let res = self.conn_by_descriptor.insert(descriptor, conn.clone());
assert!(res.is_none());
let conns = self.conns_by_remote.entry(descriptor.remote).or_default();
warn!("add_connection: {:?}", conn);
conns.push(conn);
Ok(())
}
pub fn get_connection(&self, descriptor: ConnectionDescriptor) -> Option<NetworkConnection> {
self.conn_by_addr.get(&descriptor).cloned()
let out = self.conn_by_descriptor.get(&descriptor).cloned();
warn!("get_connection: {:?} -> {:?}", descriptor, out);
out
}
pub fn get_last_connection_by_remote(&self, remote: PeerAddress) -> Option<NetworkConnection> {
let out = self
.conns_by_remote
.get(&remote)
.map(|v| v[(v.len() - 1)].clone());
warn!("get_last_connection_by_remote: {:?} -> {:?}", remote, out);
out
}
pub fn connection_count(&self) -> usize {
self.conn_by_addr.len()
self.conn_by_descriptor.len()
}
pub fn remove_connection(
&mut self,
descriptor: ConnectionDescriptor,
) -> Result<NetworkConnection, String> {
self.conn_by_addr
warn!("remove_connection: {:?}", descriptor);
let out = self
.conn_by_descriptor
.remove(&descriptor)
.ok_or_else(|| format!("Connection not in table: {:?}", descriptor))
.ok_or_else(|| format!("Connection not in table: {:?}", descriptor))?;
match self.conns_by_remote.entry(descriptor.remote) {
Entry::Vacant(_) => {
panic!("inconsistency in connection table")
}
Entry::Occupied(mut o) => {
let v = o.get_mut();
// Remove one matching connection from the list
for (n, elem) in v.iter().enumerate() {
if elem.connection_descriptor() == descriptor {
v.remove(n);
break;
}
}
// No connections left for this remote, remove the entry from conns_by_remote
if v.is_empty() {
o.remove_entry();
}
}
}
Ok(out)
}
}

View File

@ -14,7 +14,6 @@ use protocol::tcp::RawTcpProtocolHandler;
use protocol::udp::RawUdpProtocolHandler;
use protocol::ws::WebsocketProtocolHandler;
pub use protocol::*;
use utils::async_peek_stream::*;
use utils::network_interfaces::*;
use async_std::io;
@ -302,6 +301,8 @@ impl Network {
}
// Handle connection-oriented protocols
// Try to send to the exact existing connection if one exists
if let Some(conn) = self.connection_manager().get_connection(descriptor).await {
// connection exists, send over it
conn.send(data).await.map_err(logthru_net!())?;
@ -355,7 +356,8 @@ impl Network {
match self
.clone()
.send_data_to_existing_connection(descriptor, data)
.await?
.await
.map_err(logthru_net!())?
{
None => {
return Ok(());
@ -371,7 +373,9 @@ impl Network {
.best_dial_info()
.ok_or_else(|| "couldn't send data, no dial info or peer address".to_owned())?;
self.send_data_to_dial_info(dial_info, data).await
self.send_data_to_dial_info(dial_info, data)
.await
.map_err(logthru_net!())
}
/////////////////////////////////////////////////////////////////

View File

@ -1,8 +1,6 @@
use super::*;
use crate::intf::*;
use crate::network_connection::*;
use utils::clone_stream::*;
use async_tls::TlsAcceptor;
/////////////////////////////////////////////////////////////////
@ -135,7 +133,7 @@ impl Network {
let addr = match tcp_stream.peer_addr() {
Ok(addr) => addr,
Err(e) => {
error!("failed to get peer address: {}", e);
log_net!(error "failed to get peer address: {}", e);
return;
}
};
@ -159,6 +157,7 @@ impl Network {
{
// If we fail to get a packet within the connection initial timeout
// then we punt this connection
log_net!(warn "connection initial timeout from: {:?}", addr);
return;
}

View File

@ -56,7 +56,7 @@ impl ProtocolNetworkConnection {
}
}
pub async fn close(&mut self) -> Result<(), String> {
pub async fn close(&self) -> Result<(), String> {
match self {
Self::Dummy(d) => d.close(),
Self::RawTcp(t) => t.close().await,
@ -66,7 +66,7 @@ impl ProtocolNetworkConnection {
}
}
pub async fn send(&mut self, message: Vec<u8>) -> Result<(), String> {
pub async fn send(&self, message: Vec<u8>) -> Result<(), String> {
match self {
Self::Dummy(d) => d.send(message),
Self::RawTcp(t) => t.send(message).await,
@ -75,7 +75,7 @@ impl ProtocolNetworkConnection {
Self::Wss(w) => w.send(message).await,
}
}
pub async fn recv(&mut self) -> Result<Vec<u8>, String> {
pub async fn recv(&self) -> Result<Vec<u8>, String> {
match self {
Self::Dummy(d) => d.recv(),
Self::RawTcp(t) => t.recv().await,

View File

@ -1,5 +1,4 @@
use super::*;
use crate::intf::native::utils::async_peek_stream::*;
use crate::intf::*;
use crate::network_manager::MAX_MESSAGE_SIZE;
use crate::*;
@ -22,37 +21,43 @@ impl RawTcpNetworkConnection {
Self { stream }
}
pub async fn close(&mut self) -> Result<(), String> {
pub async fn close(&self) -> Result<(), String> {
self.stream
.clone()
.close()
.await
.map_err(map_to_string)
.map_err(logthru_net!())
}
pub async fn send(&mut self, message: Vec<u8>) -> Result<(), String> {
pub async fn send(&self, message: Vec<u8>) -> Result<(), String> {
log_net!("sending TCP message of size {}", message.len());
if message.len() > MAX_MESSAGE_SIZE {
return Err("sending too large TCP message".to_owned());
}
let len = message.len() as u16;
let header = [b'V', b'L', len as u8, (len >> 8) as u8];
self.stream
let mut stream = self.stream.clone();
stream
.write_all(&header)
.await
.map_err(map_to_string)
.map_err(logthru_net!())?;
self.stream
stream
.write_all(&message)
.await
.map_err(map_to_string)
.map_err(logthru_net!())
}
pub async fn recv(&mut self) -> Result<Vec<u8>, String> {
pub async fn recv(&self) -> Result<Vec<u8>, String> {
let mut header = [0u8; 4];
self.stream
let mut stream = self.stream.clone();
stream
.read_exact(&mut header)
.await
.map_err(|e| format!("TCP recv error: {}", e))?;
@ -65,10 +70,7 @@ impl RawTcpNetworkConnection {
}
let mut out: Vec<u8> = vec![0u8; len];
self.stream
.read_exact(&mut out)
.await
.map_err(map_to_string)?;
stream.read_exact(&mut out).await.map_err(map_to_string)?;
Ok(out)
}
}
@ -122,6 +124,8 @@ impl RawTcpProtocolHandler {
ProtocolNetworkConnection::RawTcp(RawTcpNetworkConnection::new(stream)),
);
warn!("on_accept_async from: {}", socket_addr);
Ok(Some(conn))
}

View File

@ -1,5 +1,4 @@
use super::*;
use crate::intf::native::utils::async_peek_stream::*;
use crate::intf::*;
use crate::network_manager::MAX_MESSAGE_SIZE;
use crate::*;
@ -19,31 +18,12 @@ pub type WebsocketNetworkConnectionWSS =
WebsocketNetworkConnection<async_tls::client::TlsStream<async_std::net::TcpStream>>;
pub type WebsocketNetworkConnectionWS = WebsocketNetworkConnection<async_std::net::TcpStream>;
struct WebSocketNetworkConnectionInner<T>
where
T: io::Read + io::Write + Send + Unpin + 'static,
{
ws_stream: WebSocketStream<T>,
}
pub struct WebsocketNetworkConnection<T>
where
T: io::Read + io::Write + Send + Unpin + 'static,
{
tls: bool,
inner: Arc<AsyncMutex<WebSocketNetworkConnectionInner<T>>>,
}
impl<T> Clone for WebsocketNetworkConnection<T>
where
T: io::Read + io::Write + Send + Unpin + 'static,
{
fn clone(&self) -> Self {
Self {
tls: self.tls,
inner: self.inner.clone(),
}
}
ws_stream: CloneStream<WebSocketStream<T>>,
}
impl<T> fmt::Debug for WebsocketNetworkConnection<T>
@ -62,38 +42,28 @@ where
pub fn new(tls: bool, ws_stream: WebSocketStream<T>) -> Self {
Self {
tls,
inner: Arc::new(AsyncMutex::new(WebSocketNetworkConnectionInner {
ws_stream,
})),
ws_stream: CloneStream::new(ws_stream),
}
}
pub async fn close(&self) -> Result<(), String> {
let mut inner = self.inner.lock().await;
inner
.ws_stream
.close(None)
.await
.map_err(map_to_string)
.map_err(logthru_net!(error "failed to close websocket"))
self.ws_stream.clone().close().await.map_err(map_to_string)
}
pub async fn send(&self, message: Vec<u8>) -> Result<(), String> {
if message.len() > MAX_MESSAGE_SIZE {
return Err("received too large WS message".to_owned());
}
let mut inner = self.inner.lock().await;
inner
.ws_stream
self.ws_stream
.clone()
.send(Message::binary(message))
.await
.map_err(map_to_string)
.map_err(logthru_net!(error "failed to send websocket message"))
}
pub async fn recv(&self) -> Result<Vec<u8>, String> {
let mut inner = self.inner.lock().await;
let out = match inner.ws_stream.next().await {
pub async fn recv(&self) -> Result<Vec<u8>, String> {
let out = match self.ws_stream.clone().next().await {
Some(Ok(Message::Binary(v))) => v,
Some(Ok(_)) => {
return Err("Unexpected WS message type".to_owned()).map_err(logthru_net!(error));

View File

@ -1,83 +0,0 @@
pub use async_std::channel;
#[derive(Debug)]
pub struct Sender<T> {
imp: channel::Sender<T>,
}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
Self {
imp: self.imp.clone(),
}
}
}
#[derive(Debug)]
pub struct Receiver<T> {
imp: channel::Receiver<T>,
}
impl<T> Clone for Receiver<T> {
fn clone(&self) -> Self {
Self {
imp: self.imp.clone(),
}
}
}
pub fn channel<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
let imp = channel::bounded(cap);
(Sender { imp: imp.0 }, Receiver { imp: imp.1 })
}
pub use channel::SendError;
pub use channel::TrySendError;
#[allow(dead_code)]
impl<T> Sender<T> {
// NOTE: This needs a timeout or you could block a very long time
// pub async fn send(&self, msg: T) -> Result<(), SendError<T>> {
// self.imp.send(msg).await
// }
pub async fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
self.imp.try_send(msg)
}
pub fn capacity(&self) -> usize {
self.imp.capacity().unwrap()
}
pub fn is_empty(&self) -> bool {
self.imp.is_empty()
}
pub fn is_full(&self) -> bool {
self.imp.is_full()
}
pub fn len(&self) -> usize {
self.imp.len()
}
}
pub use channel::RecvError;
pub use channel::TryRecvError;
#[allow(dead_code)]
impl<T> Receiver<T> {
pub async fn recv(&self) -> Result<T, RecvError> {
self.imp.recv().await
}
pub async fn try_recv(&self) -> Result<T, TryRecvError> {
self.imp.try_recv()
}
pub fn capacity(&self) -> usize {
self.imp.capacity().unwrap()
}
pub fn is_empty(&self) -> bool {
self.imp.is_empty()
}
pub fn is_full(&self) -> bool {
self.imp.is_full()
}
pub fn len(&self) -> usize {
self.imp.len()
}
}

View File

@ -1,66 +0,0 @@
use crate::xx::*;
use async_std::io::{Read, Result, Write};
use core::task::{Context, Poll};
use std::pin::Pin;
pub struct CloneStream<T>
where
T: Read + Write + Send + Unpin,
{
inner: Arc<Mutex<T>>,
}
impl<T> Clone for CloneStream<T>
where
T: Read + Write + Send + Unpin,
{
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl<T> CloneStream<T>
where
T: Read + Write + Send + Unpin,
{
pub fn new(t: T) -> Self {
Self {
inner: Arc::new(Mutex::new(t)),
}
}
}
impl<T> Read for CloneStream<T>
where
T: Read + Write + Send + Unpin,
{
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<Result<usize>> {
let mut inner = self.inner.lock();
Pin::new(&mut *inner).poll_read(cx, buf)
}
}
impl<T> Write for CloneStream<T>
where
T: Read + Write + Send + Unpin,
{
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
let mut inner = self.inner.lock();
Pin::new(&mut *inner).poll_write(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
let mut inner = self.inner.lock();
Pin::new(&mut *inner).poll_flush(cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
let mut inner = self.inner.lock();
Pin::new(&mut *inner).poll_close(cx)
}
}

View File

@ -1,8 +1,5 @@
#[cfg(target_os = "android")]
pub mod android;
pub mod async_peek_stream;
pub mod channel;
pub mod clone_stream;
#[cfg(target_os = "ios")]
pub mod ios;
pub mod network_interfaces;

View File

@ -65,6 +65,8 @@ impl Network {
}
// Handle connection-oriented protocols
// Try to send to the exact existing connection if one exists
if let Some(conn) = self.connection_manager().get_connection(descriptor).await {
// connection exists, send over it
conn.send(data).await.map_err(logthru_net!())?;

View File

@ -46,20 +46,20 @@ impl ProtocolNetworkConnection {
}
}
}
pub async fn close(&mut self) -> Result<(), String> {
pub async fn close(&self) -> Result<(), String> {
match self {
Self::Dummy(d) => d.close(),
Self::Ws(w) => w.close().await,
}
}
pub async fn send(&mut self, message: Vec<u8>) -> Result<(), String> {
pub async fn send(&self, message: Vec<u8>) -> Result<(), String> {
match self {
Self::Dummy(d) => d.send(message),
Self::Ws(w) => w.send(message).await,
}
}
pub async fn recv(&mut self) -> Result<Vec<u8>, String> {
pub async fn recv(&self) -> Result<Vec<u8>, String> {
match self {
Self::Dummy(d) => d.recv(),
Self::Ws(w) => w.recv().await,

View File

@ -1,154 +0,0 @@
use crate::xx::*;
use alloc::collections::VecDeque;
use core::fmt;
#[derive(Debug)]
pub struct Channel<T> {
items: VecDeque<T>,
cap: usize,
eventual: Eventual,
}
#[derive(Debug)]
pub struct Sender<T> {
imp: Arc<Mutex<Channel<T>>>,
}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
Self {
imp: self.imp.clone(),
}
}
}
#[derive(Debug)]
pub struct Receiver<T> {
imp: Arc<Mutex<Channel<T>>>,
}
impl<T> Clone for Receiver<T> {
fn clone(&self) -> Self {
Self {
imp: self.imp.clone(),
}
}
}
pub fn channel<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
let imp = Channel {
items: VecDeque::with_capacity(cap),
cap,
eventual: Eventual::new(),
};
let imparc = Arc::new(Mutex::new(imp));
(
Sender {
imp: imparc.clone(),
},
Receiver {
imp: imparc.clone(),
},
)
}
#[derive(Debug, PartialEq, Eq)]
pub enum TrySendError<T> {
Full(T),
Disconnected(T),
}
impl<T> fmt::Display for TrySendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
TrySendError::Full(_) => {
write!(f, "Full")
}
TrySendError::Disconnected(_) => {
write!(f, "Disconnected")
}
}
}
}
impl<T> Sender<T> {
// NOTE: This needs a timeout or you could block a very long time
// pub async fn send(&self, msg: T) -> Result<(), SendError<T>> {
// xxx
// }
pub async fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
let eventual = {
let mut inner = self.imp.lock();
if inner.items.len() == inner.cap {
return Err(TrySendError::Full(msg));
}
let empty = inner.items.is_empty();
inner.items.push_back(msg);
if empty {
Some(inner.eventual.clone())
} else {
None
}
};
if let Some(e) = eventual {
e.resolve().await;
}
Ok(())
}
pub fn capacity(&self) -> usize {
self.imp.lock().cap
}
pub fn is_empty(&self) -> bool {
self.imp.lock().items.is_empty()
}
pub fn is_full(&self) -> bool {
let inner = self.imp.lock();
inner.items.len() == inner.cap
}
pub fn len(&self) -> usize {
self.imp.lock().items.len()
}
}
#[derive(Debug, PartialEq, Eq)]
pub struct RecvError;
#[derive(Debug, PartialEq, Eq)]
pub enum TryRecvError {
Empty,
Disconnected,
}
impl<T> Receiver<T> {
pub async fn recv(&self) -> Result<T, RecvError> {
let eventual = {
let inner = self.imp.lock();
inner.eventual.clone()
};
while self.is_empty() {
eventual.instance_clone(true).await;
}
Ok(self.imp.lock().items.pop_front().unwrap())
}
pub async fn try_recv(&self) -> Result<T, TryRecvError> {
if self.is_empty() {
return Err(TryRecvError::Empty);
}
Ok(self.imp.lock().items.pop_front().unwrap())
}
pub fn capacity(&self) -> usize {
self.imp.lock().cap
}
pub fn is_empty(&self) -> bool {
self.imp.lock().items.is_empty()
}
pub fn is_full(&self) -> bool {
let inner = self.imp.lock();
inner.items.len() == inner.cap
}
pub fn len(&self) -> usize {
self.imp.lock().items.len()
}
}

View File

@ -1,7 +1,5 @@
#![cfg(target_arch = "wasm32")]
pub mod channel;
use crate::xx::*;
use core::sync::atomic::{AtomicI8, Ordering};
use js_sys::{global, Reflect};

View File

@ -1,6 +1,5 @@
#![deny(clippy::all)]
#![deny(unused_must_use)]
#![cfg_attr(target_arch = "wasm32", no_std)]
#[macro_use]
extern crate alloc;

View File

@ -5,41 +5,34 @@ use crate::*;
///////////////////////////////////////////////////////////
// Accept
cfg_if! {
if #[cfg(not(target_arch = "wasm32"))] {
use async_std::net::*;
use utils::async_peek_stream::*;
pub trait ProtocolAcceptHandler: ProtocolAcceptHandlerClone + Send + Sync {
fn on_accept(
&self,
stream: AsyncPeekStream,
peer_addr: SocketAddr,
) -> SystemPinBoxFuture<Result<Option<NetworkConnection>, String>>;
}
pub trait ProtocolAcceptHandler: ProtocolAcceptHandlerClone + Send + Sync {
fn on_accept(
&self,
stream: AsyncPeekStream,
peer_addr: SocketAddr,
) -> SystemPinBoxFuture<Result<Option<NetworkConnection>, String>>;
}
pub trait ProtocolAcceptHandlerClone {
fn clone_box(&self) -> Box<dyn ProtocolAcceptHandler>;
}
pub trait ProtocolAcceptHandlerClone {
fn clone_box(&self) -> Box<dyn ProtocolAcceptHandler>;
}
impl<T> ProtocolAcceptHandlerClone for T
where
T: 'static + ProtocolAcceptHandler + Clone,
{
fn clone_box(&self) -> Box<dyn ProtocolAcceptHandler> {
Box::new(self.clone())
}
}
impl Clone for Box<dyn ProtocolAcceptHandler> {
fn clone(&self) -> Box<dyn ProtocolAcceptHandler> {
self.clone_box()
}
}
pub type NewProtocolAcceptHandler =
dyn Fn(VeilidConfig, bool, SocketAddr) -> Box<dyn ProtocolAcceptHandler> + Send;
impl<T> ProtocolAcceptHandlerClone for T
where
T: 'static + ProtocolAcceptHandler + Clone,
{
fn clone_box(&self) -> Box<dyn ProtocolAcceptHandler> {
Box::new(self.clone())
}
}
impl Clone for Box<dyn ProtocolAcceptHandler> {
fn clone(&self) -> Box<dyn ProtocolAcceptHandler> {
self.clone_box()
}
}
pub type NewProtocolAcceptHandler =
dyn Fn(VeilidConfig, bool, SocketAddr) -> Box<dyn ProtocolAcceptHandler> + Send;
///////////////////////////////////////////////////////////
// Dummy protocol network connection for testing
@ -64,7 +57,6 @@ impl DummyNetworkConnection {
#[derive(Debug)]
struct NetworkConnectionInner {
protocol_connection: ProtocolNetworkConnection,
last_message_sent_time: Option<u64>,
last_message_recv_time: Option<u64>,
}
@ -73,7 +65,8 @@ struct NetworkConnectionInner {
struct NetworkConnectionArc {
descriptor: ConnectionDescriptor,
established_time: u64,
inner: AsyncMutex<NetworkConnectionInner>,
protocol_connection: ProtocolNetworkConnection,
inner: Mutex<NetworkConnectionInner>,
}
#[derive(Clone, Debug)]
@ -89,9 +82,8 @@ impl PartialEq for NetworkConnection {
impl Eq for NetworkConnection {}
impl NetworkConnection {
fn new_inner(protocol_connection: ProtocolNetworkConnection) -> NetworkConnectionInner {
fn new_inner() -> NetworkConnectionInner {
NetworkConnectionInner {
protocol_connection,
last_message_sent_time: None,
last_message_recv_time: None,
}
@ -103,7 +95,8 @@ impl NetworkConnection {
NetworkConnectionArc {
descriptor,
established_time: intf::get_timestamp(),
inner: AsyncMutex::new(Self::new_inner(protocol_connection)),
protocol_connection,
inner: Mutex::new(Self::new_inner()),
}
}
@ -135,23 +128,24 @@ impl NetworkConnection {
}
pub async fn close(&self) -> Result<(), String> {
let mut inner = self.arc.inner.lock().await;
inner.protocol_connection.close().await
self.arc.protocol_connection.close().await
}
pub async fn send(&self, message: Vec<u8>) -> Result<(), String> {
let mut inner = self.arc.inner.lock().await;
let out = inner.protocol_connection.send(message).await;
let ts = intf::get_timestamp();
let out = self.arc.protocol_connection.send(message).await;
if out.is_ok() {
inner.last_message_sent_time = Some(intf::get_timestamp());
let mut inner = self.arc.inner.lock();
inner.last_message_sent_time.max_assign(Some(ts));
}
out
}
pub async fn recv(&self) -> Result<Vec<u8>, String> {
let mut inner = self.arc.inner.lock().await;
let out = inner.protocol_connection.recv().await;
let ts = intf::get_timestamp();
let out = self.arc.protocol_connection.recv().await;
if out.is_ok() {
inner.last_message_recv_time = Some(intf::get_timestamp());
let mut inner = self.arc.inner.lock();
inner.last_message_recv_time.max_assign(Some(ts));
}
out
}

View File

@ -377,6 +377,7 @@ impl NetworkManager {
node_ref: NodeRef,
body: B,
) -> Result<(), String> {
log_net!("sending envelope to {:?}", node_ref);
// Get node's min/max version and see if we can send to it
// and if so, get the max version we can use
let version = if let Some((node_min, node_max)) = node_ref.operate(|e| e.min_max_version())
@ -388,7 +389,8 @@ impl NetworkManager {
node_ref.node_id(),
node_min,
node_max
));
))
.map_err(logthru_rpc!(warn));
}
cmp::min(node_max, MAX_VERSION)
} else {
@ -396,7 +398,9 @@ impl NetworkManager {
};
// Build the envelope to send
let out = self.build_envelope(node_ref.node_id(), version, body)?;
let out = self
.build_envelope(node_ref.node_id(), version, body)
.map_err(logthru_rpc!(error))?;
// Send via relay if we have to
self.net().send_data(node_ref, out).await
@ -433,6 +437,11 @@ impl NetworkManager {
data: &[u8],
descriptor: ConnectionDescriptor,
) -> Result<bool, String> {
log_net!(
"envelope of {} bytes received from {:?}",
data.len(),
descriptor
);
// Is this an out-of-band receipt instead of an envelope?
if data[0..4] == *RECEIPT_MAGIC {
self.process_receipt(data).await?;
@ -530,7 +539,6 @@ impl NetworkManager {
// Pass message to RPC system
rpc.enqueue_message(envelope, body, source_noderef)
.await
.map_err(|e| format!("enqueing rpc message failed: {}", e))?;
// Inform caller that we dealt with the envelope locally

View File

@ -6,7 +6,6 @@ pub use debug::*;
pub use private_route::*;
use crate::dht::*;
use crate::intf::utils::channel::*;
use crate::intf::*;
use crate::xx::*;
use crate::*;
@ -145,7 +144,7 @@ pub struct RPCProcessorInner {
routing_table: RoutingTable,
node_id: key::DHTKey,
node_id_secret: key::DHTKeySecret,
send_channel: Option<Sender<RPCMessage>>,
send_channel: Option<async_channel::Sender<RPCMessage>>,
timeout: u64,
max_route_hop_count: usize,
waiting_rpc_table: BTreeMap<OperationId, EventualValue<RPCMessageReader>>,
@ -394,9 +393,10 @@ impl RPCProcessor {
let (op_id, wants_answer, is_ping) = {
let operation = message
.get_root::<veilid_capnp::operation::Reader>()
.map_err(map_error_internal!("invalid operation"))?;
.map_err(map_error_internal!("invalid operation"))
.map_err(logthru_rpc!(error))?;
let op_id = operation.get_op_id();
let wants_answer = self.wants_answer(&operation)?;
let wants_answer = self.wants_answer(&operation).map_err(logthru_rpc!())?;
let is_ping = operation.get_detail().has_info_q();
(op_id, wants_answer, is_ping)
@ -490,7 +490,8 @@ impl RPCProcessor {
// Verify hop count isn't larger than out maximum routed hop count
if hopcount > self.inner.lock().max_route_hop_count {
return Err(rpc_error_internal("hop count too long for route"));
return Err(rpc_error_internal("hop count too long for route"))
.map_err(logthru_rpc!(warn));
}
// calculate actual timeout
// timeout is number of hops times the timeout per hop
@ -1245,7 +1246,7 @@ impl RPCProcessor {
}
}
async fn rpc_worker(self, receiver: Receiver<RPCMessage>) {
async fn rpc_worker(self, receiver: async_channel::Receiver<RPCMessage>) {
while let Ok(msg) = receiver.recv().await {
let _ = self
.process_rpc_message(msg)
@ -1284,7 +1285,7 @@ impl RPCProcessor {
}
inner.timeout = timeout;
inner.max_route_hop_count = max_route_hop_count;
let channel = channel(queue_size as usize);
let channel = async_channel::bounded(queue_size as usize);
inner.send_channel = Some(channel.0.clone());
// spin up N workers
@ -1303,7 +1304,7 @@ impl RPCProcessor {
*self.inner.lock() = Self::new_inner(self.network_manager());
}
pub async fn enqueue_message(
pub fn enqueue_message(
&self,
envelope: envelope::Envelope,
body: Vec<u8>,
@ -1324,7 +1325,6 @@ impl RPCProcessor {
};
send_channel
.try_send(msg)
.await
.map_err(|e| format!("failed to enqueue received RPC message: {:?}", e))?;
Ok(())
}

View File

@ -1,4 +1,3 @@
use crate::intf::utils::async_peek_stream::*;
use crate::xx::*;
use async_std::io;
use async_std::net::{TcpListener, TcpStream};

View File

@ -1,7 +1,10 @@
use crate::xx::*;
use async_std::io::{Read, ReadExt, Result, Write};
use core::pin::Pin;
use core::task::{Context, Poll};
use futures_util::AsyncRead as Read;
use futures_util::AsyncReadExt;
use futures_util::AsyncWrite as Write;
use std::io::Result;
////////
///
@ -168,4 +171,4 @@ impl Write for AsyncPeekStream {
}
}
impl std::marker::Unpin for AsyncPeekStream {}
impl core::marker::Unpin for AsyncPeekStream {}

View File

@ -0,0 +1,111 @@
use crate::xx::*;
use core::pin::Pin;
use core::task::{Context, Poll};
use futures_util::AsyncRead as Read;
use futures_util::AsyncWrite as Write;
use futures_util::Sink;
use futures_util::Stream;
use std::io;
pub struct CloneStream<T>
where
T: Unpin,
{
inner: Arc<Mutex<T>>,
}
impl<T> Clone for CloneStream<T>
where
T: Unpin,
{
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl<T> CloneStream<T>
where
T: Unpin,
{
pub fn new(t: T) -> Self {
Self {
inner: Arc::new(Mutex::new(t)),
}
}
}
impl<T> Read for CloneStream<T>
where
T: Read + Unpin,
{
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let mut inner = self.inner.lock();
Pin::new(&mut *inner).poll_read(cx, buf)
}
}
impl<T> Write for CloneStream<T>
where
T: Write + Unpin,
{
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
let mut inner = self.inner.lock();
Pin::new(&mut *inner).poll_write(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let mut inner = self.inner.lock();
Pin::new(&mut *inner).poll_flush(cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let mut inner = self.inner.lock();
Pin::new(&mut *inner).poll_close(cx)
}
}
impl<T> Stream for CloneStream<T>
where
T: Stream + Unpin,
{
type Item = T::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut inner = self.inner.lock();
Pin::new(&mut *inner).poll_next(cx)
}
}
impl<T, Item> Sink<Item> for CloneStream<T>
where
T: Sink<Item> + Unpin,
{
type Error = T::Error;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let mut inner = self.inner.lock();
Pin::new(&mut *inner).poll_ready(cx)
}
fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
let mut inner = self.inner.lock();
Pin::new(&mut *inner).start_send(item)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let mut inner = self.inner.lock();
Pin::new(&mut *inner).poll_flush(cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let mut inner = self.inner.lock();
Pin::new(&mut *inner).poll_close(cx)
}
}

View File

@ -36,6 +36,14 @@ macro_rules! log_net {
(error $fmt:literal, $($arg:expr),+) => {
error!(target:"net", $fmt, $($arg),+);
};
(warn $text:expr) => {warn!(
target: "net",
"{}",
$text,
)};
(warn $fmt:literal, $($arg:expr),+) => {
warn!(target:"net", $fmt, $($arg),+);
};
($text:expr) => {trace!(
target: "net",
"{}",
@ -56,6 +64,14 @@ macro_rules! log_rpc {
(error $fmt:literal, $($arg:expr),+) => {
error!(target:"rpc", $fmt, $($arg),+);
};
(warn $text:expr) => { warn!(
target: "rpc",
"{}",
$text,
)};
(warn $fmt:literal, $($arg:expr),+) => {
warn!(target:"rpc", $fmt, $($arg),+);
};
($text:expr) => {trace!(
target: "rpc",
"{}",
@ -76,6 +92,14 @@ macro_rules! log_rtab {
(error $fmt:literal, $($arg:expr),+) => {
error!(target:"rtab", $fmt, $($arg),+);
};
(warn $text:expr) => { warn!(
target: "rtab",
"{}",
$text,
)};
(warn $fmt:literal, $($arg:expr),+) => {
warn!(target:"rtab", $fmt, $($arg),+);
};
($text:expr) => {trace!(
target: "rtab",
"{}",
@ -153,6 +177,33 @@ macro_rules! logthru {
);
e__
});
// warn
(warn $target:literal) => (|e__| {
warn!(
target: $target,
"[{}]",
e__,
);
e__
});
(warn $target:literal, $text:literal) => (|e__| {
warn!(
target: $target,
"[{}] {}",
e__,
$text
);
e__
});
(warn $target:literal, $fmt:literal, $($arg:expr),+) => (|e__| {
warn!(
target: $target,
concat!("[{}] ", $fmt),
e__,
$($arg),+
);
e__
});
// debug
(debug $target:literal) => (|e__| {
debug!(

View File

@ -1,4 +1,6 @@
// mod bump_port;
mod async_peek_stream;
mod clone_stream;
mod eventual;
mod eventual_base;
mod eventual_value;
@ -68,6 +70,7 @@ cfg_if! {
pub use async_std::pin::Pin;
pub use async_std::sync::Mutex as AsyncMutex;
pub use async_std::sync::MutexGuard as AsyncMutexGuard;
pub use async_std::channel as async_channel;
pub use std::net::{ SocketAddr, SocketAddrV4, SocketAddrV6, ToSocketAddrs, IpAddr, Ipv4Addr, Ipv6Addr };
pub type SystemPinBoxFuture<T> = PinBox<dyn Future<Output = T> + Send + 'static>;
pub type SystemPinBoxFutureLifetime<'a, T> = PinBox<dyn Future<Output = T> + Send + 'a>;
@ -75,6 +78,8 @@ cfg_if! {
}
// pub use bump_port::*;
pub use async_peek_stream::*;
pub use clone_stream::*;
pub use eventual::*;
pub use eventual_base::{EventualCommon, EventualResolvedFuture};
pub use eventual_value::*;