add direct bootstrap fallback
This commit is contained in:
parent
0adcc70bc9
commit
a475028c75
@ -1091,6 +1091,26 @@ impl NetworkManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Direct bootstrap request
|
||||||
|
pub async fn boot_request(&self, dial_info: DialInfo) -> Result<Vec<PeerInfo>, String> {
|
||||||
|
let timeout_ms = {
|
||||||
|
let c = self.config.get();
|
||||||
|
c.network.rpc.timeout_ms
|
||||||
|
};
|
||||||
|
// Send boot magic to requested peer address
|
||||||
|
let data = BOOT_MAGIC.to_vec();
|
||||||
|
let out_data: Vec<u8> = self
|
||||||
|
.net()
|
||||||
|
.send_recv_data_unbound_to_dial_info(dial_info, data, timeout_ms)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let bootstrap_peerinfo: Vec<PeerInfo> =
|
||||||
|
deserialize_json(std::str::from_utf8(&out_data).map_err(map_to_string)?)
|
||||||
|
.map_err(map_to_string)?;
|
||||||
|
|
||||||
|
Ok(bootstrap_peerinfo)
|
||||||
|
}
|
||||||
|
|
||||||
// Called when a packet potentially containing an RPC envelope is received by a low-level
|
// Called when a packet potentially containing an RPC envelope is received by a low-level
|
||||||
// network protocol handler. Processes the envelope, authenticates and decrypts the RPC message
|
// network protocol handler. Processes the envelope, authenticates and decrypts the RPC message
|
||||||
// and passes it to the RPC handler
|
// and passes it to the RPC handler
|
||||||
|
@ -297,6 +297,50 @@ impl Network {
|
|||||||
res
|
res
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Send data to a dial info, unbound, using a new connection from a random port
|
||||||
|
// Waits for a specified amount of time to receive a single response
|
||||||
|
// This creates a short-lived connection in the case of connection-oriented protocols
|
||||||
|
// for the purpose of sending this one message.
|
||||||
|
// This bypasses the connection table as it is not a 'node to node' connection.
|
||||||
|
#[instrument(level="trace", err, skip(self, data), fields(data.len = data.len(), ret.len))]
|
||||||
|
pub async fn send_recv_data_unbound_to_dial_info(
|
||||||
|
&self,
|
||||||
|
dial_info: DialInfo,
|
||||||
|
data: Vec<u8>,
|
||||||
|
timeout_ms: u32,
|
||||||
|
) -> Result<Vec<u8>, String> {
|
||||||
|
let data_len = data.len();
|
||||||
|
let out = match dial_info.protocol_type() {
|
||||||
|
ProtocolType::UDP => {
|
||||||
|
let peer_socket_addr = dial_info.to_socket_addr();
|
||||||
|
RawUdpProtocolHandler::send_recv_unbound_message(peer_socket_addr, data, timeout_ms)
|
||||||
|
.await?
|
||||||
|
}
|
||||||
|
ProtocolType::TCP => {
|
||||||
|
let peer_socket_addr = dial_info.to_socket_addr();
|
||||||
|
RawTcpProtocolHandler::send_recv_unbound_message(peer_socket_addr, data, timeout_ms)
|
||||||
|
.await?
|
||||||
|
}
|
||||||
|
ProtocolType::WS | ProtocolType::WSS => {
|
||||||
|
WebsocketProtocolHandler::send_recv_unbound_message(
|
||||||
|
dial_info.clone(),
|
||||||
|
data,
|
||||||
|
timeout_ms,
|
||||||
|
)
|
||||||
|
.await?
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Network accounting
|
||||||
|
self.network_manager()
|
||||||
|
.stats_packet_sent(dial_info.to_ip_addr(), data_len as u64);
|
||||||
|
self.network_manager()
|
||||||
|
.stats_packet_rcvd(dial_info.to_ip_addr(), out.len() as u64);
|
||||||
|
|
||||||
|
tracing::Span::current().record("ret.len", &out.len());
|
||||||
|
Ok(out)
|
||||||
|
}
|
||||||
|
|
||||||
#[instrument(level="trace", err, skip(self, data), fields(data.len = data.len()))]
|
#[instrument(level="trace", err, skip(self, data), fields(data.len = data.len()))]
|
||||||
pub async fn send_data_to_existing_connection(
|
pub async fn send_data_to_existing_connection(
|
||||||
&self,
|
&self,
|
||||||
|
@ -51,6 +51,37 @@ impl ProtocolNetworkConnection {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn send_recv_unbound_message(
|
||||||
|
dial_info: DialInfo,
|
||||||
|
data: Vec<u8>,
|
||||||
|
timeout_ms: u32,
|
||||||
|
) -> Result<Vec<u8>, String> {
|
||||||
|
match dial_info.protocol_type() {
|
||||||
|
ProtocolType::UDP => {
|
||||||
|
let peer_socket_addr = dial_info.to_socket_addr();
|
||||||
|
udp::RawUdpProtocolHandler::send_recv_unbound_message(
|
||||||
|
peer_socket_addr,
|
||||||
|
data,
|
||||||
|
timeout_ms,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
ProtocolType::TCP => {
|
||||||
|
let peer_socket_addr = dial_info.to_socket_addr();
|
||||||
|
tcp::RawTcpProtocolHandler::send_recv_unbound_message(
|
||||||
|
peer_socket_addr,
|
||||||
|
data,
|
||||||
|
timeout_ms,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
ProtocolType::WS | ProtocolType::WSS => {
|
||||||
|
ws::WebsocketProtocolHandler::send_recv_unbound_message(dial_info, data, timeout_ms)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn descriptor(&self) -> ConnectionDescriptor {
|
pub fn descriptor(&self) -> ConnectionDescriptor {
|
||||||
match self {
|
match self {
|
||||||
Self::Dummy(d) => d.descriptor(),
|
Self::Dummy(d) => d.descriptor(),
|
||||||
|
@ -41,7 +41,7 @@ impl RawTcpNetworkConnection {
|
|||||||
.map_err(map_to_string)
|
.map_err(map_to_string)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn send_internal(mut stream: AsyncPeekStream, message: Vec<u8>) -> Result<(), String> {
|
async fn send_internal(stream: &mut AsyncPeekStream, message: Vec<u8>) -> Result<(), String> {
|
||||||
log_net!("sending TCP message of size {}", message.len());
|
log_net!("sending TCP message of size {}", message.len());
|
||||||
if message.len() > MAX_MESSAGE_SIZE {
|
if message.len() > MAX_MESSAGE_SIZE {
|
||||||
return Err("sending too large TCP message".to_owned());
|
return Err("sending too large TCP message".to_owned());
|
||||||
@ -55,16 +55,13 @@ impl RawTcpNetworkConnection {
|
|||||||
|
|
||||||
#[instrument(level="trace", err, skip(self, message), fields(message.len = message.len()))]
|
#[instrument(level="trace", err, skip(self, message), fields(message.len = message.len()))]
|
||||||
pub async fn send(&self, message: Vec<u8>) -> Result<(), String> {
|
pub async fn send(&self, message: Vec<u8>) -> Result<(), String> {
|
||||||
let stream = self.stream.clone();
|
let mut stream = self.stream.clone();
|
||||||
Self::send_internal(stream, message).await
|
Self::send_internal(&mut stream, message).await
|
||||||
}
|
}
|
||||||
|
|
||||||
#[instrument(level="trace", err, skip(self), fields(message.len))]
|
pub async fn recv_internal(stream: &mut AsyncPeekStream) -> Result<Vec<u8>, String> {
|
||||||
pub async fn recv(&self) -> Result<Vec<u8>, String> {
|
|
||||||
let mut header = [0u8; 4];
|
let mut header = [0u8; 4];
|
||||||
|
|
||||||
let mut stream = self.stream.clone();
|
|
||||||
|
|
||||||
stream
|
stream
|
||||||
.read_exact(&mut header)
|
.read_exact(&mut header)
|
||||||
.await
|
.await
|
||||||
@ -80,7 +77,14 @@ impl RawTcpNetworkConnection {
|
|||||||
let mut out: Vec<u8> = vec![0u8; len];
|
let mut out: Vec<u8> = vec![0u8; len];
|
||||||
stream.read_exact(&mut out).await.map_err(map_to_string)?;
|
stream.read_exact(&mut out).await.map_err(map_to_string)?;
|
||||||
|
|
||||||
tracing::Span::current().record("message.len", &out.len());
|
Ok(out)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[instrument(level="trace", err, skip(self), fields(ret.len))]
|
||||||
|
pub async fn recv(&self) -> Result<Vec<u8>, String> {
|
||||||
|
let mut stream = self.stream.clone();
|
||||||
|
let out = Self::recv_internal(&mut stream).await?;
|
||||||
|
tracing::Span::current().record("ret.len", &out.len());
|
||||||
Ok(out)
|
Ok(out)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -212,11 +216,54 @@ impl RawTcpProtocolHandler {
|
|||||||
// .local_addr()
|
// .local_addr()
|
||||||
// .map_err(map_to_string)
|
// .map_err(map_to_string)
|
||||||
// .map_err(logthru_net!("could not get local address from TCP stream"))?;
|
// .map_err(logthru_net!("could not get local address from TCP stream"))?;
|
||||||
let ps = AsyncPeekStream::new(ts.clone());
|
let mut ps = AsyncPeekStream::new(ts.clone());
|
||||||
|
|
||||||
// Send directly from the raw network connection
|
// Send directly from the raw network connection
|
||||||
// this builds the connection and tears it down immediately after the send
|
// this builds the connection and tears it down immediately after the send
|
||||||
RawTcpNetworkConnection::send_internal(ps, data).await
|
RawTcpNetworkConnection::send_internal(&mut ps, data).await
|
||||||
|
}
|
||||||
|
|
||||||
|
#[instrument(level = "trace", err, skip(data), fields(data.len = data.len(), ret.len))]
|
||||||
|
pub async fn send_recv_unbound_message(
|
||||||
|
socket_addr: SocketAddr,
|
||||||
|
data: Vec<u8>,
|
||||||
|
timeout_ms: u32,
|
||||||
|
) -> Result<Vec<u8>, String> {
|
||||||
|
if data.len() > MAX_MESSAGE_SIZE {
|
||||||
|
return Err("sending too large unbound TCP message".to_owned());
|
||||||
|
}
|
||||||
|
trace!(
|
||||||
|
"sending unbound message of length {} to {}",
|
||||||
|
data.len(),
|
||||||
|
socket_addr
|
||||||
|
);
|
||||||
|
|
||||||
|
// Make a shared socket
|
||||||
|
let socket = new_unbound_shared_tcp_socket(socket2::Domain::for_address(socket_addr))?;
|
||||||
|
|
||||||
|
// Non-blocking connect to remote address
|
||||||
|
let ts = nonblocking_connect(socket, socket_addr)
|
||||||
|
.await
|
||||||
|
.map_err(map_to_string)
|
||||||
|
.map_err(logthru_net!(error "remote_addr={}", socket_addr))?;
|
||||||
|
|
||||||
|
// See what local address we ended up with and turn this into a stream
|
||||||
|
// let actual_local_address = ts
|
||||||
|
// .local_addr()
|
||||||
|
// .map_err(map_to_string)
|
||||||
|
// .map_err(logthru_net!("could not get local address from TCP stream"))?;
|
||||||
|
let mut ps = AsyncPeekStream::new(ts.clone());
|
||||||
|
|
||||||
|
// Send directly from the raw network connection
|
||||||
|
// this builds the connection and tears it down immediately after the send
|
||||||
|
RawTcpNetworkConnection::send_internal(&mut ps, data).await?;
|
||||||
|
|
||||||
|
let out = timeout(timeout_ms, RawTcpNetworkConnection::recv_internal(&mut ps))
|
||||||
|
.await
|
||||||
|
.map_err(map_to_string)??;
|
||||||
|
|
||||||
|
tracing::Span::current().record("ret.len", &out.len());
|
||||||
|
Ok(out)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -68,6 +68,7 @@ impl RawUdpProtocolHandler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[instrument(level = "trace", err, skip(data), fields(data.len = data.len()))]
|
||||||
pub async fn send_unbound_message(
|
pub async fn send_unbound_message(
|
||||||
socket_addr: SocketAddr,
|
socket_addr: SocketAddr,
|
||||||
data: Vec<u8>,
|
data: Vec<u8>,
|
||||||
@ -104,4 +105,61 @@ impl RawUdpProtocolHandler {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[instrument(level = "trace", err, skip(data), fields(data.len = data.len(), ret.len))]
|
||||||
|
pub async fn send_recv_unbound_message(
|
||||||
|
socket_addr: SocketAddr,
|
||||||
|
data: Vec<u8>,
|
||||||
|
timeout_ms: u32,
|
||||||
|
) -> Result<Vec<u8>, String> {
|
||||||
|
if data.len() > MAX_MESSAGE_SIZE {
|
||||||
|
return Err("sending too large unbound UDP message".to_owned())
|
||||||
|
.map_err(logthru_net!(error));
|
||||||
|
}
|
||||||
|
log_net!(
|
||||||
|
"sending unbound message of length {} to {}",
|
||||||
|
data.len(),
|
||||||
|
socket_addr
|
||||||
|
);
|
||||||
|
|
||||||
|
// get local wildcard address for bind
|
||||||
|
let local_socket_addr = match socket_addr {
|
||||||
|
SocketAddr::V4(_) => SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0),
|
||||||
|
SocketAddr::V6(_) => {
|
||||||
|
SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0)), 0)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// get unspecified bound socket
|
||||||
|
let socket = UdpSocket::bind(local_socket_addr)
|
||||||
|
.await
|
||||||
|
.map_err(map_to_string)
|
||||||
|
.map_err(logthru_net!(error "failed to bind unbound udp socket"))?;
|
||||||
|
let len = socket
|
||||||
|
.send_to(&data, socket_addr)
|
||||||
|
.await
|
||||||
|
.map_err(map_to_string)
|
||||||
|
.map_err(logthru_net!(error "failed unbound udp send: addr={}", socket_addr))?;
|
||||||
|
if len != data.len() {
|
||||||
|
return Err("UDP partial unbound send".to_owned()).map_err(logthru_net!(error));
|
||||||
|
}
|
||||||
|
|
||||||
|
// receive single response
|
||||||
|
let mut out = vec![0u8; MAX_MESSAGE_SIZE];
|
||||||
|
let (len, from_addr) = timeout(timeout_ms, socket.recv_from(&mut out))
|
||||||
|
.await
|
||||||
|
.map_err(map_to_string)?
|
||||||
|
.map_err(map_to_string)?;
|
||||||
|
|
||||||
|
// if the from address is not the same as the one we sent to, then drop this
|
||||||
|
if from_addr != socket_addr {
|
||||||
|
return Err(format!(
|
||||||
|
"Unbound response received from wrong address: addr={}",
|
||||||
|
from_addr,
|
||||||
|
));
|
||||||
|
}
|
||||||
|
out.resize(len, 0u8);
|
||||||
|
tracing::Span::current().record("ret.len", &len);
|
||||||
|
Ok(out)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -72,7 +72,7 @@ where
|
|||||||
.map_err(logthru_net!(error "failed to send websocket message"))
|
.map_err(logthru_net!(error "failed to send websocket message"))
|
||||||
}
|
}
|
||||||
|
|
||||||
#[instrument(level="trace", err, skip(self), fields(message.len))]
|
#[instrument(level = "trace", err, skip(self), fields(ret.len))]
|
||||||
pub async fn recv(&self) -> Result<Vec<u8>, String> {
|
pub async fn recv(&self) -> Result<Vec<u8>, String> {
|
||||||
let out = match self.stream.clone().next().await {
|
let out = match self.stream.clone().next().await {
|
||||||
Some(Ok(Message::Binary(v))) => v,
|
Some(Ok(Message::Binary(v))) => v,
|
||||||
@ -89,7 +89,7 @@ where
|
|||||||
if out.len() > MAX_MESSAGE_SIZE {
|
if out.len() > MAX_MESSAGE_SIZE {
|
||||||
Err("sending too large WS message".to_owned()).map_err(logthru_net!(error))
|
Err("sending too large WS message".to_owned()).map_err(logthru_net!(error))
|
||||||
} else {
|
} else {
|
||||||
tracing::Span::current().record("message.len", &out.len());
|
tracing::Span::current().record("ret.len", &out.len());
|
||||||
Ok(out)
|
Ok(out)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -283,11 +283,6 @@ impl WebsocketProtocolHandler {
|
|||||||
if data.len() > MAX_MESSAGE_SIZE {
|
if data.len() > MAX_MESSAGE_SIZE {
|
||||||
return Err("sending too large unbound WS message".to_owned());
|
return Err("sending too large unbound WS message".to_owned());
|
||||||
}
|
}
|
||||||
trace!(
|
|
||||||
"sending unbound websocket message of length {} to {}",
|
|
||||||
data.len(),
|
|
||||||
dial_info,
|
|
||||||
);
|
|
||||||
|
|
||||||
let protconn = Self::connect_internal(None, dial_info.clone())
|
let protconn = Self::connect_internal(None, dial_info.clone())
|
||||||
.await
|
.await
|
||||||
@ -295,6 +290,29 @@ impl WebsocketProtocolHandler {
|
|||||||
|
|
||||||
protconn.send(data).await
|
protconn.send(data).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[instrument(level = "trace", err, skip(data), fields(data.len = data.len(), ret.len))]
|
||||||
|
pub async fn send_recv_unbound_message(
|
||||||
|
dial_info: DialInfo,
|
||||||
|
data: Vec<u8>,
|
||||||
|
timeout_ms: u32,
|
||||||
|
) -> Result<Vec<u8>, String> {
|
||||||
|
if data.len() > MAX_MESSAGE_SIZE {
|
||||||
|
return Err("sending too large unbound WS message".to_owned());
|
||||||
|
}
|
||||||
|
|
||||||
|
let protconn = Self::connect_internal(None, dial_info.clone())
|
||||||
|
.await
|
||||||
|
.map_err(|e| format!("failed to connect websocket for unbound message: {}", e))?;
|
||||||
|
|
||||||
|
protconn.send(data).await?;
|
||||||
|
let out = timeout(timeout_ms, protconn.recv())
|
||||||
|
.await
|
||||||
|
.map_err(map_to_string)??;
|
||||||
|
|
||||||
|
tracing::Span::current().record("ret.len", &out.len());
|
||||||
|
Ok(out)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ProtocolAcceptHandler for WebsocketProtocolHandler {
|
impl ProtocolAcceptHandler for WebsocketProtocolHandler {
|
||||||
|
@ -79,6 +79,46 @@ impl Network {
|
|||||||
res
|
res
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Send data to a dial info, unbound, using a new connection from a random port
|
||||||
|
// Waits for a specified amount of time to receive a single response
|
||||||
|
// This creates a short-lived connection in the case of connection-oriented protocols
|
||||||
|
// for the purpose of sending this one message.
|
||||||
|
// This bypasses the connection table as it is not a 'node to node' connection.
|
||||||
|
#[instrument(level="trace", err, skip(self, data), fields(data.len = data.len(), ret.len))]
|
||||||
|
pub async fn send_recv_data_unbound_to_dial_info(
|
||||||
|
&self,
|
||||||
|
dial_info: DialInfo,
|
||||||
|
data: Vec<u8>,
|
||||||
|
timeout_ms: u32,
|
||||||
|
) -> Result<Vec<u8>, String> {
|
||||||
|
let data_len = data.len();
|
||||||
|
let out = match dial_info.protocol_type() {
|
||||||
|
ProtocolType::UDP => {
|
||||||
|
return Err("no support for UDP protocol".to_owned()).map_err(logthru_net!(error))
|
||||||
|
}
|
||||||
|
ProtocolType::TCP => {
|
||||||
|
return Err("no support for TCP protocol".to_owned()).map_err(logthru_net!(error))
|
||||||
|
}
|
||||||
|
ProtocolType::WS | ProtocolType::WSS => {
|
||||||
|
WebsocketProtocolHandler::send_recv_unbound_message(
|
||||||
|
dial_info.clone(),
|
||||||
|
data,
|
||||||
|
timeout_ms,
|
||||||
|
)
|
||||||
|
.await?
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Network accounting
|
||||||
|
self.network_manager()
|
||||||
|
.stats_packet_sent(dial_info.to_ip_addr(), data_len as u64);
|
||||||
|
self.network_manager()
|
||||||
|
.stats_packet_rcvd(dial_info.to_ip_addr(), out.len() as u64);
|
||||||
|
|
||||||
|
tracing::Span::current().record("ret.len", &out.len());
|
||||||
|
Ok(out)
|
||||||
|
}
|
||||||
|
|
||||||
#[instrument(level="trace", err, skip(self, data), fields(data.len = data.len()))]
|
#[instrument(level="trace", err, skip(self, data), fields(data.len = data.len()))]
|
||||||
pub async fn send_data_to_existing_connection(
|
pub async fn send_data_to_existing_connection(
|
||||||
&self,
|
&self,
|
||||||
|
@ -18,10 +18,10 @@ impl ProtocolNetworkConnection {
|
|||||||
) -> Result<ProtocolNetworkConnection, String> {
|
) -> Result<ProtocolNetworkConnection, String> {
|
||||||
match dial_info.protocol_type() {
|
match dial_info.protocol_type() {
|
||||||
ProtocolType::UDP => {
|
ProtocolType::UDP => {
|
||||||
panic!("UDP dial info is not support on WASM targets");
|
panic!("UDP dial info is not supported on WASM targets");
|
||||||
}
|
}
|
||||||
ProtocolType::TCP => {
|
ProtocolType::TCP => {
|
||||||
panic!("TCP dial info is not support on WASM targets");
|
panic!("TCP dial info is not supported on WASM targets");
|
||||||
}
|
}
|
||||||
ProtocolType::WS | ProtocolType::WSS => {
|
ProtocolType::WS | ProtocolType::WSS => {
|
||||||
ws::WebsocketProtocolHandler::connect(local_address, dial_info).await
|
ws::WebsocketProtocolHandler::connect(local_address, dial_info).await
|
||||||
@ -35,10 +35,10 @@ impl ProtocolNetworkConnection {
|
|||||||
) -> Result<(), String> {
|
) -> Result<(), String> {
|
||||||
match dial_info.protocol_type() {
|
match dial_info.protocol_type() {
|
||||||
ProtocolType::UDP => {
|
ProtocolType::UDP => {
|
||||||
panic!("UDP dial info is not support on WASM targets");
|
panic!("UDP dial info is not supported on WASM targets");
|
||||||
}
|
}
|
||||||
ProtocolType::TCP => {
|
ProtocolType::TCP => {
|
||||||
panic!("TCP dial info is not support on WASM targets");
|
panic!("TCP dial info is not supported on WASM targets");
|
||||||
}
|
}
|
||||||
ProtocolType::WS | ProtocolType::WSS => {
|
ProtocolType::WS | ProtocolType::WSS => {
|
||||||
ws::WebsocketProtocolHandler::send_unbound_message(dial_info, data).await
|
ws::WebsocketProtocolHandler::send_unbound_message(dial_info, data).await
|
||||||
@ -46,6 +46,25 @@ impl ProtocolNetworkConnection {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn send_recv_unbound_message(
|
||||||
|
dial_info: DialInfo,
|
||||||
|
data: Vec<u8>,
|
||||||
|
timeout_ms: u32,
|
||||||
|
) -> Result<Vec<u8>, String> {
|
||||||
|
match dial_info.protocol_type() {
|
||||||
|
ProtocolType::UDP => {
|
||||||
|
panic!("UDP dial info is not supported on WASM targets");
|
||||||
|
}
|
||||||
|
ProtocolType::TCP => {
|
||||||
|
panic!("TCP dial info is not supported on WASM targets");
|
||||||
|
}
|
||||||
|
ProtocolType::WS | ProtocolType::WSS => {
|
||||||
|
ws::WebsocketProtocolHandler::send_recv_unbound_message(dial_info, data, timeout_ms)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn descriptor(&self) -> ConnectionDescriptor {
|
pub fn descriptor(&self) -> ConnectionDescriptor {
|
||||||
match self {
|
match self {
|
||||||
Self::Dummy(d) => d.descriptor(),
|
Self::Dummy(d) => d.descriptor(),
|
||||||
|
@ -36,10 +36,12 @@ impl WebsocketNetworkConnection {
|
|||||||
self.descriptor.clone()
|
self.descriptor.clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[instrument(level = "trace", err, skip(self))]
|
||||||
pub async fn close(&self) -> Result<(), String> {
|
pub async fn close(&self) -> Result<(), String> {
|
||||||
self.inner.ws_meta.close().await.map_err(map_to_string).map(drop)
|
self.inner.ws_meta.close().await.map_err(map_to_string).map(drop)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[instrument(level = "trace", err, skip(self, message), fields(message.len = message.len()))]
|
||||||
pub async fn send(&self, message: Vec<u8>) -> Result<(), String> {
|
pub async fn send(&self, message: Vec<u8>) -> Result<(), String> {
|
||||||
if message.len() > MAX_MESSAGE_SIZE {
|
if message.len() > MAX_MESSAGE_SIZE {
|
||||||
return Err("sending too large WS message".to_owned()).map_err(logthru_net!(error));
|
return Err("sending too large WS message".to_owned()).map_err(logthru_net!(error));
|
||||||
@ -49,6 +51,8 @@ impl WebsocketNetworkConnection {
|
|||||||
.map_err(|_| "failed to send to websocket".to_owned())
|
.map_err(|_| "failed to send to websocket".to_owned())
|
||||||
.map_err(logthru_net!(error))
|
.map_err(logthru_net!(error))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[instrument(level = "trace", err, skip(self), fields(ret.len))]
|
||||||
pub async fn recv(&self) -> Result<Vec<u8>, String> {
|
pub async fn recv(&self) -> Result<Vec<u8>, String> {
|
||||||
let out = match self.inner.ws_stream.clone().next().await {
|
let out = match self.inner.ws_stream.clone().next().await {
|
||||||
Some(WsMessage::Binary(v)) => v,
|
Some(WsMessage::Binary(v)) => v,
|
||||||
@ -74,6 +78,7 @@ impl WebsocketNetworkConnection {
|
|||||||
pub struct WebsocketProtocolHandler {}
|
pub struct WebsocketProtocolHandler {}
|
||||||
|
|
||||||
impl WebsocketProtocolHandler {
|
impl WebsocketProtocolHandler {
|
||||||
|
#[instrument(level = "trace", err)]
|
||||||
pub async fn connect(
|
pub async fn connect(
|
||||||
local_address: Option<SocketAddr>,
|
local_address: Option<SocketAddr>,
|
||||||
dial_info: DialInfo,
|
dial_info: DialInfo,
|
||||||
@ -99,21 +104,16 @@ impl WebsocketProtocolHandler {
|
|||||||
.map_err(logthru_net!(error))?;
|
.map_err(logthru_net!(error))?;
|
||||||
|
|
||||||
// Make our connection descriptor
|
// Make our connection descriptor
|
||||||
|
|
||||||
Ok(ProtocolNetworkConnection::Ws(WebsocketNetworkConnection::new(ConnectionDescriptor::new_no_local(
|
Ok(ProtocolNetworkConnection::Ws(WebsocketNetworkConnection::new(ConnectionDescriptor::new_no_local(
|
||||||
dial_info.to_peer_address(),
|
dial_info.to_peer_address(),
|
||||||
), wsmeta, wsio)))
|
), wsmeta, wsio)))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[instrument(level = "trace", err, skip(data), fields(data.len = data.len()))]
|
||||||
pub async fn send_unbound_message(dial_info: DialInfo, data: Vec<u8>) -> Result<(), String> {
|
pub async fn send_unbound_message(dial_info: DialInfo, data: Vec<u8>) -> Result<(), String> {
|
||||||
if data.len() > MAX_MESSAGE_SIZE {
|
if data.len() > MAX_MESSAGE_SIZE {
|
||||||
return Err("sending too large unbound WS message".to_owned());
|
return Err("sending too large unbound WS message".to_owned());
|
||||||
}
|
}
|
||||||
trace!(
|
|
||||||
"sending unbound websocket message of length {} to {}",
|
|
||||||
data.len(),
|
|
||||||
dial_info,
|
|
||||||
);
|
|
||||||
|
|
||||||
// Make the real connection
|
// Make the real connection
|
||||||
let conn = Self::connect(None, dial_info)
|
let conn = Self::connect(None, dial_info)
|
||||||
@ -121,6 +121,30 @@ impl WebsocketProtocolHandler {
|
|||||||
.map_err(|e| format!("failed to connect websocket for unbound message: {}", e))?;
|
.map_err(|e| format!("failed to connect websocket for unbound message: {}", e))?;
|
||||||
|
|
||||||
conn.send(data).await
|
conn.send(data).await
|
||||||
|
}
|
||||||
|
|
||||||
|
#[instrument(level = "trace", err, skip(data), fields(data.len = data.len(), ret.len))]
|
||||||
|
pub async fn send_recv_unbound_message(
|
||||||
|
dial_info: DialInfo,
|
||||||
|
data: Vec<u8>,
|
||||||
|
timeout_ms: u32,
|
||||||
|
) -> Result<Vec<u8>, String> {
|
||||||
|
if data.len() > MAX_MESSAGE_SIZE {
|
||||||
|
return Err("sending too large unbound WS message".to_owned());
|
||||||
|
}
|
||||||
|
|
||||||
|
let conn = Self::connect(None, dial_info.clone())
|
||||||
|
.await
|
||||||
|
.map_err(|e| format!("failed to connect websocket for unbound message: {}", e))?;
|
||||||
|
|
||||||
|
conn.send(data).await?;
|
||||||
|
let out = timeout(timeout_ms, conn.recv())
|
||||||
|
.await
|
||||||
|
.map_err(map_to_string)??;
|
||||||
|
|
||||||
|
tracing::Span::current().record("ret.len", &out.len());
|
||||||
|
Ok(out)
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
@ -187,7 +187,7 @@ impl RoutingTable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// add all nodes from buckets
|
// add all nodes from buckets
|
||||||
Self::with_entries_unlocked(&*inner, cur_ts, BucketEntryState::Unreliable, |k, v| {
|
Self::with_entries(&*inner, cur_ts, BucketEntryState::Unreliable, |k, v| {
|
||||||
// Apply filter
|
// Apply filter
|
||||||
if filter(k, Some(v.clone())) {
|
if filter(k, Some(v.clone())) {
|
||||||
nodes.push((k, Some(v.clone())));
|
nodes.push((k, Some(v.clone())));
|
||||||
@ -352,7 +352,7 @@ impl RoutingTable {
|
|||||||
let mut best_inbound_relay: Option<(DHTKey, Arc<BucketEntry>)> = None;
|
let mut best_inbound_relay: Option<(DHTKey, Arc<BucketEntry>)> = None;
|
||||||
|
|
||||||
// Iterate all known nodes for candidates
|
// Iterate all known nodes for candidates
|
||||||
Self::with_entries_unlocked(inner, cur_ts, BucketEntryState::Unreliable, |k, v| {
|
Self::with_entries(inner, cur_ts, BucketEntryState::Unreliable, |k, v| {
|
||||||
// Ensure this node is not on our local network
|
// Ensure this node is not on our local network
|
||||||
if v.with(|e| {
|
if v.with(|e| {
|
||||||
e.local_node_info()
|
e.local_node_info()
|
||||||
|
@ -337,7 +337,7 @@ impl RoutingTable {
|
|||||||
// Public dial info changed, go through all nodes and reset their 'seen our node info' bit
|
// Public dial info changed, go through all nodes and reset their 'seen our node info' bit
|
||||||
if matches!(domain, RoutingDomain::PublicInternet) {
|
if matches!(domain, RoutingDomain::PublicInternet) {
|
||||||
let cur_ts = intf::get_timestamp();
|
let cur_ts = intf::get_timestamp();
|
||||||
Self::with_entries_unlocked(&*inner, cur_ts, BucketEntryState::Dead, |_, v| {
|
Self::with_entries(&*inner, cur_ts, BucketEntryState::Dead, |_, v| {
|
||||||
v.with_mut(|e| e.set_seen_our_node_info(false));
|
v.with_mut(|e| e.set_seen_our_node_info(false));
|
||||||
Option::<()>::None
|
Option::<()>::None
|
||||||
});
|
});
|
||||||
@ -450,18 +450,13 @@ impl RoutingTable {
|
|||||||
let inner = this.inner.read();
|
let inner = this.inner.read();
|
||||||
let mut node_refs = Vec::<NodeRef>::with_capacity(inner.bucket_entry_count);
|
let mut node_refs = Vec::<NodeRef>::with_capacity(inner.bucket_entry_count);
|
||||||
let cur_ts = intf::get_timestamp();
|
let cur_ts = intf::get_timestamp();
|
||||||
Self::with_entries_unlocked(
|
Self::with_entries(&*inner, cur_ts, BucketEntryState::Unreliable, |k, v| {
|
||||||
&*inner,
|
|
||||||
cur_ts,
|
|
||||||
BucketEntryState::Unreliable,
|
|
||||||
|k, v| {
|
|
||||||
// Only update nodes that haven't seen our node info yet
|
// Only update nodes that haven't seen our node info yet
|
||||||
if !v.with(|e| e.has_seen_our_node_info()) {
|
if !v.with(|e| e.has_seen_our_node_info()) {
|
||||||
node_refs.push(NodeRef::new(this.clone(), k, v, None));
|
node_refs.push(NodeRef::new(this.clone(), k, v, None));
|
||||||
}
|
}
|
||||||
Option::<()>::None
|
Option::<()>::None
|
||||||
},
|
});
|
||||||
);
|
|
||||||
node_refs
|
node_refs
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -540,14 +535,14 @@ impl RoutingTable {
|
|||||||
fn get_entry_count(inner: &RoutingTableInner, min_state: BucketEntryState) -> usize {
|
fn get_entry_count(inner: &RoutingTableInner, min_state: BucketEntryState) -> usize {
|
||||||
let mut count = 0usize;
|
let mut count = 0usize;
|
||||||
let cur_ts = intf::get_timestamp();
|
let cur_ts = intf::get_timestamp();
|
||||||
Self::with_entries_unlocked(inner, cur_ts, min_state, |_, _| {
|
Self::with_entries(inner, cur_ts, min_state, |_, _| {
|
||||||
count += 1;
|
count += 1;
|
||||||
Option::<()>::None
|
Option::<()>::None
|
||||||
});
|
});
|
||||||
count
|
count
|
||||||
}
|
}
|
||||||
|
|
||||||
fn with_entries_unlocked<T, F: FnMut(DHTKey, Arc<BucketEntry>) -> Option<T>>(
|
fn with_entries<T, F: FnMut(DHTKey, Arc<BucketEntry>) -> Option<T>>(
|
||||||
inner: &RoutingTableInner,
|
inner: &RoutingTableInner,
|
||||||
cur_ts: u64,
|
cur_ts: u64,
|
||||||
min_state: BucketEntryState,
|
min_state: BucketEntryState,
|
||||||
@ -565,54 +560,6 @@ impl RoutingTable {
|
|||||||
None
|
None
|
||||||
}
|
}
|
||||||
|
|
||||||
// fn with_entries<T, F: FnMut(&DHTKey, &BucketEntryInner) -> Option<T>>(
|
|
||||||
// inner: &RoutingTableInner,
|
|
||||||
// cur_ts: u64,
|
|
||||||
// min_state: BucketEntryState,
|
|
||||||
// mut f: F,
|
|
||||||
// ) -> Option<T> {
|
|
||||||
// for bucket in &inner.buckets {
|
|
||||||
// for entry in bucket.entries() {
|
|
||||||
// let out = entry.1.with(|e| {
|
|
||||||
// if e.state(cur_ts) >= min_state {
|
|
||||||
// if let Some(out) = f(entry.0, e) {
|
|
||||||
// return Some(out);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// None
|
|
||||||
// });
|
|
||||||
// if out.is_some() {
|
|
||||||
// return out;
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// None
|
|
||||||
// }
|
|
||||||
|
|
||||||
// fn with_entries_mut<T, F: FnMut(&DHTKey, &mut BucketEntryInner) -> Option<T>>(
|
|
||||||
// inner: &RoutingTableInner,
|
|
||||||
// cur_ts: u64,
|
|
||||||
// min_state: BucketEntryState,
|
|
||||||
// mut f: F,
|
|
||||||
// ) -> Option<T> {
|
|
||||||
// for bucket in &inner.buckets {
|
|
||||||
// for entry in bucket.entries() {
|
|
||||||
// let out = entry.1.with_mut(|e| {
|
|
||||||
// if e.state(cur_ts) >= min_state {
|
|
||||||
// if let Some(out) = f(entry.0, e) {
|
|
||||||
// return Some(out);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// None
|
|
||||||
// });
|
|
||||||
// if out.is_some() {
|
|
||||||
// return out;
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// None
|
|
||||||
// }
|
|
||||||
|
|
||||||
fn queue_bucket_kick(&self, node_id: DHTKey) {
|
fn queue_bucket_kick(&self, node_id: DHTKey) {
|
||||||
let mut inner = self.inner.write();
|
let mut inner = self.inner.write();
|
||||||
let idx = Self::find_bucket_index(&*inner, node_id);
|
let idx = Self::find_bucket_index(&*inner, node_id);
|
||||||
|
@ -3,13 +3,14 @@ use super::*;
|
|||||||
use crate::dht::*;
|
use crate::dht::*;
|
||||||
use crate::xx::*;
|
use crate::xx::*;
|
||||||
use crate::*;
|
use crate::*;
|
||||||
|
use stop_token::future::FutureExt;
|
||||||
|
|
||||||
impl RoutingTable {
|
impl RoutingTable {
|
||||||
// Compute transfer statistics to determine how 'fast' a node is
|
// Compute transfer statistics to determine how 'fast' a node is
|
||||||
#[instrument(level = "trace", skip(self), err)]
|
#[instrument(level = "trace", skip(self), err)]
|
||||||
pub(super) async fn rolling_transfers_task_routine(
|
pub(super) async fn rolling_transfers_task_routine(
|
||||||
self,
|
self,
|
||||||
stop_token: StopToken,
|
_stop_token: StopToken,
|
||||||
last_ts: u64,
|
last_ts: u64,
|
||||||
cur_ts: u64,
|
cur_ts: u64,
|
||||||
) -> Result<(), String> {
|
) -> Result<(), String> {
|
||||||
@ -196,6 +197,46 @@ impl RoutingTable {
|
|||||||
Ok(bsmap)
|
Ok(bsmap)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 'direct' bootstrap task routine for systems incapable of resolving TXT records, such as browser WASM
|
||||||
|
async fn direct_bootstrap_task_routine(
|
||||||
|
self,
|
||||||
|
stop_token: StopToken,
|
||||||
|
bootstrap_dialinfos: Vec<DialInfo>,
|
||||||
|
) -> Result<(), String> {
|
||||||
|
let network_manager = self.network_manager();
|
||||||
|
|
||||||
|
let mut unord = FuturesUnordered::new();
|
||||||
|
for bootstrap_di in bootstrap_dialinfos {
|
||||||
|
let peer_info = match network_manager.boot_request(bootstrap_di).await {
|
||||||
|
Ok(v) => v,
|
||||||
|
Err(e) => {
|
||||||
|
error!("BOOT request failed: {}", e);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Got peer info, let's add it to the routing table
|
||||||
|
for pi in peer_info {
|
||||||
|
let k = pi.node_id.key;
|
||||||
|
// Register the node
|
||||||
|
let nr = self
|
||||||
|
.register_node_with_signed_node_info(k, pi.signed_node_info)
|
||||||
|
.map_err(logthru_rtab!(error "Couldn't add bootstrap node: {}", k))?;
|
||||||
|
|
||||||
|
// Add this our futures to process in parallel
|
||||||
|
unord.push(
|
||||||
|
// lets ask bootstrap to find ourselves now
|
||||||
|
self.reverse_find_node(nr, true),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for all bootstrap operations to complete before we complete the singlefuture
|
||||||
|
while let Ok(Some(_)) = unord.next().timeout_at(stop_token.clone()).await {}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
#[instrument(level = "trace", skip(self), err)]
|
#[instrument(level = "trace", skip(self), err)]
|
||||||
pub(super) async fn bootstrap_task_routine(self, stop_token: StopToken) -> Result<(), String> {
|
pub(super) async fn bootstrap_task_routine(self, stop_token: StopToken) -> Result<(), String> {
|
||||||
let (bootstrap, bootstrap_nodes) = {
|
let (bootstrap, bootstrap_nodes) = {
|
||||||
@ -208,8 +249,22 @@ impl RoutingTable {
|
|||||||
|
|
||||||
log_rtab!(debug "--- bootstrap_task");
|
log_rtab!(debug "--- bootstrap_task");
|
||||||
|
|
||||||
// If we aren't specifying a bootstrap node list explicitly, then pull from the bootstrap server(s)
|
// See if we are specifying a direct dialinfo for bootstrap, if so use the direct mechanism
|
||||||
|
if !bootstrap_nodes.is_empty() {
|
||||||
|
let mut bootstrap_dialinfos = Vec::<DialInfo>::new();
|
||||||
|
for b in &bootstrap {
|
||||||
|
if let Ok(bootstrap_di) = DialInfo::from_str(&b) {
|
||||||
|
bootstrap_dialinfos.push(bootstrap_di);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if bootstrap_dialinfos.len() > 0 {
|
||||||
|
return self
|
||||||
|
.direct_bootstrap_task_routine(stop_token, bootstrap_dialinfos)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we aren't specifying a bootstrap node list explicitly, then pull from the bootstrap server(s)
|
||||||
let bsmap: BootstrapRecordMap = if !bootstrap_nodes.is_empty() {
|
let bsmap: BootstrapRecordMap = if !bootstrap_nodes.is_empty() {
|
||||||
let mut bsmap = BootstrapRecordMap::new();
|
let mut bsmap = BootstrapRecordMap::new();
|
||||||
let mut bootstrap_node_dial_infos = Vec::new();
|
let mut bootstrap_node_dial_infos = Vec::new();
|
||||||
@ -290,7 +345,7 @@ impl RoutingTable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Wait for all bootstrap operations to complete before we complete the singlefuture
|
// Wait for all bootstrap operations to complete before we complete the singlefuture
|
||||||
while unord.next().await.is_some() {}
|
while let Ok(Some(_)) = unord.next().timeout_at(stop_token.clone()).await {}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -311,7 +366,7 @@ impl RoutingTable {
|
|||||||
{
|
{
|
||||||
let inner = self.inner.read();
|
let inner = self.inner.read();
|
||||||
|
|
||||||
Self::with_entries_unlocked(&*inner, cur_ts, BucketEntryState::Unreliable, |k, v| {
|
Self::with_entries(&*inner, cur_ts, BucketEntryState::Unreliable, |k, v| {
|
||||||
if v.with(|e| e.needs_ping(&k, cur_ts, relay_node_id)) {
|
if v.with(|e| e.needs_ping(&k, cur_ts, relay_node_id)) {
|
||||||
let nr = NodeRef::new(self.clone(), k, v, None);
|
let nr = NodeRef::new(self.clone(), k, v, None);
|
||||||
unord.push(MustJoinHandle::new(intf::spawn_local(
|
unord.push(MustJoinHandle::new(intf::spawn_local(
|
||||||
@ -323,7 +378,7 @@ impl RoutingTable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Wait for futures to complete
|
// Wait for futures to complete
|
||||||
while unord.next().await.is_some() {}
|
while let Ok(Some(_)) = unord.next().timeout_at(stop_token.clone()).await {}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -340,7 +395,7 @@ impl RoutingTable {
|
|||||||
let inner = self.inner.read();
|
let inner = self.inner.read();
|
||||||
let mut noderefs = Vec::<NodeRef>::with_capacity(inner.bucket_entry_count);
|
let mut noderefs = Vec::<NodeRef>::with_capacity(inner.bucket_entry_count);
|
||||||
let cur_ts = intf::get_timestamp();
|
let cur_ts = intf::get_timestamp();
|
||||||
Self::with_entries_unlocked(&*inner, cur_ts, BucketEntryState::Unreliable, |k, v| {
|
Self::with_entries(&*inner, cur_ts, BucketEntryState::Unreliable, |k, v| {
|
||||||
noderefs.push(NodeRef::new(self.clone(), k, v, None));
|
noderefs.push(NodeRef::new(self.clone(), k, v, None));
|
||||||
Option::<()>::None
|
Option::<()>::None
|
||||||
});
|
});
|
||||||
@ -353,7 +408,7 @@ impl RoutingTable {
|
|||||||
log_rtab!("--- peer minimum search with {:?}", nr);
|
log_rtab!("--- peer minimum search with {:?}", nr);
|
||||||
unord.push(self.reverse_find_node(nr, false));
|
unord.push(self.reverse_find_node(nr, false));
|
||||||
}
|
}
|
||||||
while unord.next().await.is_some() {}
|
while let Ok(Some(_)) = unord.next().timeout_at(stop_token.clone()).await {}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user