wasm fixes

This commit is contained in:
John Smith 2022-10-06 11:40:55 -04:00
parent b1cc0d803c
commit e77577ba66
11 changed files with 87 additions and 59 deletions

View File

@ -74,7 +74,7 @@ fi
rustup target add aarch64-linux-android armv7-linux-androideabi i686-linux-android x86_64-linux-android wasm32-unknown-unknown rustup target add aarch64-linux-android armv7-linux-androideabi i686-linux-android x86_64-linux-android wasm32-unknown-unknown
# install cargo packages # install cargo packages
cargo install wasm-bindgen-cli cargo install wasm-bindgen-cli wasm-pack
# Ensure packages are installed # Ensure packages are installed
sudo apt-get install libc6-dev-i386 libc6:i386 libncurses5:i386 libstdc++6:i386 lib32z1 libbz2-1.0:i386 openjdk-11-jdk llvm wabt capnproto protobuf-compiler sudo apt-get install libc6-dev-i386 libc6:i386 libncurses5:i386 libstdc++6:i386 lib32z1 libbz2-1.0:i386 openjdk-11-jdk llvm wabt capnproto protobuf-compiler

View File

@ -90,13 +90,7 @@ fi
rustup target add aarch64-apple-darwin aarch64-apple-ios x86_64-apple-darwin x86_64-apple-ios wasm32-unknown-unknown aarch64-linux-android armv7-linux-androideabi i686-linux-android x86_64-linux-android rustup target add aarch64-apple-darwin aarch64-apple-ios x86_64-apple-darwin x86_64-apple-ios wasm32-unknown-unknown aarch64-linux-android armv7-linux-androideabi i686-linux-android x86_64-linux-android
# install cargo packages # install cargo packages
cargo install wasm-bindgen-cli cargo install wasm-bindgen-cli wasm-pack
# install bitcode compatible ios toolchain
# echo Manual Step:
# echo install +ios-arm64-1.57.0 toolchain for bitcode from https://github.com/getditto/rust-bitcode/releases/latest and unzip
# echo xattr -d -r com.apple.quarantine .
# echo ./install.sh
# ensure we have command line tools # ensure we have command line tools
xcode-select --install xcode-select --install

View File

@ -251,7 +251,8 @@ impl ConnectionManager {
// Async lock on the remote address for atomicity per remote // Async lock on the remote address for atomicity per remote
let peer_address = dial_info.to_peer_address(); let peer_address = dial_info.to_peer_address();
let remote_addr = peer_address.to_socket_addr(); let remote_addr = peer_address.to_socket_addr();
let _lock_guard = self.arc.address_lock_table.lock_tag(remote_addr);
let _lock_guard = self.arc.address_lock_table.lock_tag(remote_addr).await;
log_net!( log_net!(
"== get_or_create_connection local_addr={:?} dial_info={:?}", "== get_or_create_connection local_addr={:?} dial_info={:?}",
@ -369,7 +370,7 @@ impl ConnectionManager {
// Called by low-level network when any connection-oriented protocol connection appears // Called by low-level network when any connection-oriented protocol connection appears
// either from incoming connections. // either from incoming connections.
#[cfg_attr(target_os = "wasm32", allow(dead_code))] #[cfg_attr(target_arch = "wasm32", allow(dead_code))]
pub(super) async fn on_accepted_protocol_network_connection( pub(super) async fn on_accepted_protocol_network_connection(
&self, &self,
protocol_connection: ProtocolNetworkConnection, protocol_connection: ProtocolNetworkConnection,

View File

@ -353,7 +353,7 @@ impl NetworkManager {
let node_refs = routing_table.get_nodes_needing_ping(RoutingDomain::PublicInternet, cur_ts); let node_refs = routing_table.get_nodes_needing_ping(RoutingDomain::PublicInternet, cur_ts);
// Look up any NAT mappings we may need to try to preserve with keepalives // Look up any NAT mappings we may need to try to preserve with keepalives
let mut mapped_port_info = routing_table.get_mapped_port_info(); let mut mapped_port_info = routing_table.get_low_level_port_info();
// Get the PublicInternet relay if we are using one // Get the PublicInternet relay if we are using one
let opt_relay_nr = routing_table.relay_node(RoutingDomain::PublicInternet); let opt_relay_nr = routing_table.relay_node(RoutingDomain::PublicInternet);
@ -558,7 +558,8 @@ impl NetworkManager {
// Do we need a relay? // Do we need a relay?
if !has_relay && node_info.requires_relay() { if !has_relay && node_info.requires_relay() {
// Do we need an outbound relay? // Do we want an outbound relay?
let mut got_outbound_relay = false;
if network_class.outbound_wants_relay() { if network_class.outbound_wants_relay() {
// The outbound relay is the host of the PWA // The outbound relay is the host of the PWA
if let Some(outbound_relay_peerinfo) = intf::get_outbound_relay_peer().await { if let Some(outbound_relay_peerinfo) = intf::get_outbound_relay_peer().await {
@ -571,10 +572,11 @@ impl NetworkManager {
) { ) {
info!("Outbound relay node selected: {}", nr); info!("Outbound relay node selected: {}", nr);
editor.set_relay_node(nr); editor.set_relay_node(nr);
got_outbound_relay = true;
} }
} }
// Otherwise we must need an inbound relay }
} else { if !got_outbound_relay {
// Find a node in our routing table that is an acceptable inbound relay // Find a node in our routing table that is an acceptable inbound relay
if let Some(nr) = if let Some(nr) =
routing_table.find_inbound_relay(RoutingDomain::PublicInternet, cur_ts) routing_table.find_inbound_relay(RoutingDomain::PublicInternet, cur_ts)

View File

@ -52,6 +52,7 @@ pub async fn test_add_get_remove() {
); );
let c1 = NetworkConnection::dummy(1, a1); let c1 = NetworkConnection::dummy(1, a1);
let c1b = NetworkConnection::dummy(10, a1);
let c1h = c1.get_handle(); let c1h = c1.get_handle();
let c2 = NetworkConnection::dummy(2, a2); let c2 = NetworkConnection::dummy(2, a2);
let c3 = NetworkConnection::dummy(3, a3); let c3 = NetworkConnection::dummy(3, a3);
@ -65,6 +66,7 @@ pub async fn test_add_get_remove() {
assert_eq!(table.connection_count(), 0); assert_eq!(table.connection_count(), 0);
assert_eq!(table.get_connection_by_descriptor(a1), None); assert_eq!(table.get_connection_by_descriptor(a1), None);
table.add_connection(c1).unwrap(); table.add_connection(c1).unwrap();
assert!(table.add_connection(c1b).is_err());
assert_eq!(table.connection_count(), 1); assert_eq!(table.connection_count(), 1);
assert!(table.remove_connection_by_id(4).is_none()); assert!(table.remove_connection_by_id(4).is_none());

View File

@ -16,7 +16,10 @@ struct NetworkInner {
} }
struct NetworkUnlockedInner { struct NetworkUnlockedInner {
// Accessors
routing_table: RoutingTable,
network_manager: NetworkManager, network_manager: NetworkManager,
connection_manager: ConnectionManager,
} }
#[derive(Clone)] #[derive(Clone)]
@ -35,9 +38,15 @@ impl Network {
} }
} }
fn new_unlocked_inner(network_manager: NetworkManager) -> NetworkUnlockedInner { fn new_unlocked_inner(
network_manager: NetworkManager,
routing_table: RoutingTable,
connection_manager: ConnectionManager,
) -> NetworkUnlockedInner {
NetworkUnlockedInner { NetworkUnlockedInner {
network_manager network_manager,
routing_table,
connection_manager
} }
} }
@ -49,15 +58,18 @@ impl Network {
Self { Self {
config: network_manager.config(), config: network_manager.config(),
inner: Arc::new(Mutex::new(Self::new_inner())), inner: Arc::new(Mutex::new(Self::new_inner())),
unlocked_inner: Arc::new(Self::new_unlocked_inner(network_manager)) unlocked_inner: Arc::new(Self::new_unlocked_inner(network_manager, routing_table, connection_manager))
} }
} }
fn network_manager(&self) -> NetworkManager { fn network_manager(&self) -> NetworkManager {
self.unlocked_inner.network_manager.clone() self.unlocked_inner.network_manager.clone()
} }
fn routing_table(&self) -> RoutingTable {
self.unlocked_inner.routing_table.clone()
}
fn connection_manager(&self) -> ConnectionManager { fn connection_manager(&self) -> ConnectionManager {
self.unlocked_inner.network_manager.connection_manager() self.unlocked_inner.connection_manager.clone()
} }
///////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////
@ -279,8 +291,7 @@ impl Network {
trace!("stopping network"); trace!("stopping network");
// Reset state // Reset state
let network_manager = self.network_manager(); let routing_table = self.routing_table();
let routing_table = network_manager.routing_table();
// Drop all dial info // Drop all dial info
let mut editor = routing_table.edit_routing_domain(RoutingDomain::PublicInternet); let mut editor = routing_table.edit_routing_domain(RoutingDomain::PublicInternet);
@ -299,7 +310,7 @@ impl Network {
trace!("network stopped"); trace!("network stopped");
} }
pub fn is_usable_interface_address(&self, addr: IpAddr) -> bool { pub fn is_usable_interface_address(&self, _addr: IpAddr) -> bool {
false false
} }

View File

@ -112,7 +112,7 @@ impl WebsocketProtocolHandler {
timeout_ms: u32, timeout_ms: u32,
) -> io::Result<NetworkResult<ProtocolNetworkConnection>> { ) -> io::Result<NetworkResult<ProtocolNetworkConnection>> {
// Split dial info up // Split dial info up
let (tls, scheme) = match dial_info { let (_tls, scheme) = match dial_info {
DialInfo::WS(_) => (false, "ws"), DialInfo::WS(_) => (false, "ws"),
DialInfo::WSS(_) => (true, "wss"), DialInfo::WSS(_) => (true, "wss"),
_ => panic!("invalid dialinfo for WS/WSS protocol"), _ => panic!("invalid dialinfo for WS/WSS protocol"),

View File

@ -7,7 +7,7 @@ use crate::*;
pub type LowLevelProtocolPorts = BTreeSet<(LowLevelProtocolType, AddressType, u16)>; pub type LowLevelProtocolPorts = BTreeSet<(LowLevelProtocolType, AddressType, u16)>;
pub type ProtocolToPortMapping = BTreeMap<(ProtocolType, AddressType), (LowLevelProtocolType, u16)>; pub type ProtocolToPortMapping = BTreeMap<(ProtocolType, AddressType), (LowLevelProtocolType, u16)>;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct MappedPortInfo { pub struct LowLevelPortInfo {
pub low_level_protocol_ports: LowLevelProtocolPorts, pub low_level_protocol_ports: LowLevelProtocolPorts,
pub protocol_to_port: ProtocolToPortMapping, pub protocol_to_port: ProtocolToPortMapping,
} }
@ -389,7 +389,7 @@ impl RoutingTable {
// Only one protocol per low level protocol/port combination is required // Only one protocol per low level protocol/port combination is required
// For example, if WS/WSS and TCP protocols are on the same low-level TCP port, only TCP keepalives will be required // For example, if WS/WSS and TCP protocols are on the same low-level TCP port, only TCP keepalives will be required
// and we do not need to do WS/WSS keepalive as well. If they are on different ports, then we will need WS/WSS keepalives too. // and we do not need to do WS/WSS keepalive as well. If they are on different ports, then we will need WS/WSS keepalives too.
pub fn get_mapped_port_info(&self) -> MappedPortInfo { pub fn get_low_level_port_info(&self) -> LowLevelPortInfo {
let mut low_level_protocol_ports = let mut low_level_protocol_ports =
BTreeSet::<(LowLevelProtocolType, AddressType, u16)>::new(); BTreeSet::<(LowLevelProtocolType, AddressType, u16)>::new();
let mut protocol_to_port = let mut protocol_to_port =
@ -412,7 +412,7 @@ impl RoutingTable {
), ),
); );
} }
MappedPortInfo { LowLevelPortInfo {
low_level_protocol_ports, low_level_protocol_ports,
protocol_to_port, protocol_to_port,
} }
@ -423,7 +423,7 @@ impl RoutingTable {
let outbound_dif = self let outbound_dif = self
.network_manager() .network_manager()
.get_outbound_dial_info_filter(RoutingDomain::PublicInternet); .get_outbound_dial_info_filter(RoutingDomain::PublicInternet);
let mapped_port_info = self.get_mapped_port_info(); let mapped_port_info = self.get_low_level_port_info();
move |e: &BucketEntryInner| { move |e: &BucketEntryInner| {
// Ensure this node is not on the local network // Ensure this node is not on the local network
@ -435,6 +435,9 @@ impl RoutingTable {
// as we need to be able to use the relay for keepalives for all nat mappings // as we need to be able to use the relay for keepalives for all nat mappings
let mut low_level_protocol_ports = mapped_port_info.low_level_protocol_ports.clone(); let mut low_level_protocol_ports = mapped_port_info.low_level_protocol_ports.clone();
info!("outbound_dif: {:?}", outbound_dif);
info!("low_level_protocol_ports: {:?}", low_level_protocol_ports);
let can_serve_as_relay = e let can_serve_as_relay = e
.node_info(RoutingDomain::PublicInternet) .node_info(RoutingDomain::PublicInternet)
.map(|n| { .map(|n| {

View File

@ -8,6 +8,7 @@ impl RPCProcessor {
self, self,
peer: NodeRef, peer: NodeRef,
) -> Result<NetworkResult<Answer<SenderInfo>>, RPCError> { ) -> Result<NetworkResult<Answer<SenderInfo>>, RPCError> {
info!("ping to {:?}", peer);
let routing_domain = match peer.best_routing_domain() { let routing_domain = match peer.best_routing_domain() {
Some(rd) => rd, Some(rd) => rd,
None => { None => {
@ -43,6 +44,7 @@ impl RPCProcessor {
}, },
_ => return Err(RPCError::invalid_format("not an answer")), _ => return Err(RPCError::invalid_format("not an answer")),
}; };
info!("qwer");
// Ensure the returned node status is the kind for the routing domain we asked for // Ensure the returned node status is the kind for the routing domain we asked for
match routing_domain { match routing_domain {
@ -62,6 +64,8 @@ impl RPCProcessor {
} }
} }
info!("zxzxv");
// Update latest node status in routing table // Update latest node status in routing table
peer.update_node_status(status_a.node_status); peer.update_node_status(status_a.node_status);

View File

@ -35,20 +35,20 @@ pub async fn test_simple_single_contention() {
let g1 = table.lock_tag(a1).await; let g1 = table.lock_tag(a1).await;
println!("locked"); info!("locked");
let t1 = intf::spawn(async move { let t1 = intf::spawn(async move {
// move the guard into the task // move the guard into the task
let _g1_take = g1; let _g1_take = g1;
// hold the guard for a bit // hold the guard for a bit
println!("waiting"); info!("waiting");
intf::sleep(1000).await; intf::sleep(1000).await;
// release the guard // release the guard
println!("released"); info!("released");
}); });
// wait to lock again, will contend until spawned task exits // wait to lock again, will contend until spawned task exits
let _g1_b = table.lock_tag(a1).await; let _g1_b = table.lock_tag(a1).await;
println!("locked"); info!("locked");
// Ensure task is joined // Ensure task is joined
t1.await; t1.await;
@ -67,24 +67,24 @@ pub async fn test_simple_double_contention() {
let g1 = table.lock_tag(a1).await; let g1 = table.lock_tag(a1).await;
let g2 = table.lock_tag(a2).await; let g2 = table.lock_tag(a2).await;
println!("locked"); info!("locked");
let t1 = intf::spawn(async move { let t1 = intf::spawn(async move {
// move the guard into the task // move the guard into the tas
let _g1_take = g1; let _g1_take = g1;
// hold the guard for a bit // hold the guard for a bit
println!("waiting"); info!("waiting");
intf::sleep(1000).await; intf::sleep(1000).await;
// release the guard // release the guard
println!("released"); info!("released");
}); });
let t2 = intf::spawn(async move { let t2 = intf::spawn(async move {
// move the guard into the task // move the guard into the task
let _g2_take = g2; let _g2_take = g2;
// hold the guard for a bit // hold the guard for a bit
println!("waiting"); info!("waiting");
intf::sleep(500).await; intf::sleep(500).await;
// release the guard // release the guard
println!("released"); info!("released");
}); });
// wait to lock again, will contend until spawned task exits // wait to lock again, will contend until spawned task exits
@ -92,7 +92,7 @@ pub async fn test_simple_double_contention() {
// wait to lock again, should complete immediately // wait to lock again, should complete immediately
let _g2_b = table.lock_tag(a2).await; let _g2_b = table.lock_tag(a2).await;
println!("locked"); info!("locked");
// Ensure tasks are joined // Ensure tasks are joined
t1.await; t1.await;
@ -112,36 +112,36 @@ pub async fn test_parallel_single_contention() {
let t1 = intf::spawn(async move { let t1 = intf::spawn(async move {
// lock the tag // lock the tag
let _g = table1.lock_tag(a1).await; let _g = table1.lock_tag(a1).await;
println!("locked t1"); info!("locked t1");
// hold the guard for a bit // hold the guard for a bit
println!("waiting t1"); info!("waiting t1");
intf::sleep(500).await; intf::sleep(500).await;
// release the guard // release the guard
println!("released t1"); info!("released t1");
}); });
let table2 = table.clone(); let table2 = table.clone();
let t2 = intf::spawn(async move { let t2 = intf::spawn(async move {
// lock the tag // lock the tag
let _g = table2.lock_tag(a1).await; let _g = table2.lock_tag(a1).await;
println!("locked t2"); info!("locked t2");
// hold the guard for a bit // hold the guard for a bit
println!("waiting t2"); info!("waiting t2");
intf::sleep(500).await; intf::sleep(500).await;
// release the guard // release the guard
println!("released t2"); info!("released t2");
}); });
let table3 = table.clone(); let table3 = table.clone();
let t3 = intf::spawn(async move { let t3 = intf::spawn(async move {
// lock the tag // lock the tag
let _g = table3.lock_tag(a1).await; let _g = table3.lock_tag(a1).await;
println!("locked t3"); info!("locked t3");
// hold the guard for a bit // hold the guard for a bit
println!("waiting t3"); info!("waiting t3");
intf::sleep(500).await; intf::sleep(500).await;
// release the guard // release the guard
println!("released t3"); info!("released t3");
}); });
// Ensure tasks are joined // Ensure tasks are joined

View File

@ -1,18 +1,33 @@
use super::*; use super::*;
use core::fmt::Debug;
use core::hash::Hash; use core::hash::Hash;
#[derive(Debug)]
pub struct AsyncTagLockGuard<T> pub struct AsyncTagLockGuard<T>
where where
T: Hash + Eq + Clone, T: Hash + Eq + Clone + Debug,
{ {
table: AsyncTagLockTable<T>, table: AsyncTagLockTable<T>,
tag: T, tag: T,
_guard: AsyncMutexGuardArc<()>, _guard: AsyncMutexGuardArc<()>,
} }
impl<T> AsyncTagLockGuard<T>
where
T: Hash + Eq + Clone + Debug,
{
fn new(table: AsyncTagLockTable<T>, tag: T, guard: AsyncMutexGuardArc<()>) -> Self {
Self {
table,
tag,
_guard: guard,
}
}
}
impl<T> Drop for AsyncTagLockGuard<T> impl<T> Drop for AsyncTagLockGuard<T>
where where
T: Hash + Eq + Clone, T: Hash + Eq + Clone + Debug,
{ {
fn drop(&mut self) { fn drop(&mut self) {
let mut inner = self.table.inner.lock(); let mut inner = self.table.inner.lock();
@ -33,7 +48,7 @@ where
} }
} }
#[derive(Clone)] #[derive(Clone, Debug)]
struct AsyncTagLockTableEntry { struct AsyncTagLockTableEntry {
mutex: Arc<AsyncMutex<()>>, mutex: Arc<AsyncMutex<()>>,
waiters: usize, waiters: usize,
@ -41,7 +56,7 @@ struct AsyncTagLockTableEntry {
struct AsyncTagLockTableInner<T> struct AsyncTagLockTableInner<T>
where where
T: Hash + Eq + Clone, T: Hash + Eq + Clone + Debug,
{ {
table: HashMap<T, AsyncTagLockTableEntry>, table: HashMap<T, AsyncTagLockTableEntry>,
} }
@ -49,14 +64,14 @@ where
#[derive(Clone)] #[derive(Clone)]
pub struct AsyncTagLockTable<T> pub struct AsyncTagLockTable<T>
where where
T: Hash + Eq + Clone, T: Hash + Eq + Clone + Debug,
{ {
inner: Arc<Mutex<AsyncTagLockTableInner<T>>>, inner: Arc<Mutex<AsyncTagLockTableInner<T>>>,
} }
impl<T> fmt::Debug for AsyncTagLockTable<T> impl<T> fmt::Debug for AsyncTagLockTable<T>
where where
T: Hash + Eq + Clone, T: Hash + Eq + Clone + Debug,
{ {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("AsyncTagLockTable").finish() f.debug_struct("AsyncTagLockTable").finish()
@ -65,7 +80,7 @@ where
impl<T> AsyncTagLockTable<T> impl<T> AsyncTagLockTable<T>
where where
T: Hash + Eq + Clone, T: Hash + Eq + Clone + Debug,
{ {
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {
@ -111,16 +126,12 @@ where
// tokio version // tokio version
guard = mutex.lock_owned().await; guard = mutex.lock_owned().await;
} else { } else {
// async_std and wasm async_mutex version // async-std and wasm async-lock version
guard = mutex.lock_arc().await; guard = mutex.lock_arc().await;
} }
} }
// Return the locked guard // Return the locked guard
AsyncTagLockGuard { AsyncTagLockGuard::new(self.clone(), tag, guard)
table: self.clone(),
tag,
_guard: guard,
}
} }
} }