diff --git a/Cargo.lock b/Cargo.lock index 605bb8a6..1668dc55 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -378,6 +378,7 @@ dependencies = [ "blanket", "futures-core", "futures-task", + "futures-timer", "futures-util", "pin-project 1.0.11", "rustc_version", @@ -1787,6 +1788,16 @@ version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "57c66a976bf5909d801bbef33416c41372779507e7a6b3a5e25e4749c58f776a" +[[package]] +name = "futures-timer" +version = "3.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" +dependencies = [ + "gloo-timers", + "send_wrapper 0.4.0", +] + [[package]] name = "futures-util" version = "0.3.21" @@ -3869,6 +3880,12 @@ version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2333e6df6d6598f2b1974829f853c2b4c5f4a6e503c10af918081aa6f8564e1" +[[package]] +name = "send_wrapper" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f638d531eccd6e23b980caf34876660d38e265409d8e99b397ab71eb3612fad0" + [[package]] name = "send_wrapper" version = "0.5.0" @@ -3880,6 +3897,9 @@ name = "send_wrapper" version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cd0b0ec5f1c1ca621c432a25813d8d60c88abe6d3e08a3eb9cf37d97a0fe3d73" +dependencies = [ + "futures-core", +] [[package]] name = "serde" diff --git a/veilid-core/Cargo.toml b/veilid-core/Cargo.toml index cdda10dc..a3b2ae70 100644 --- a/veilid-core/Cargo.toml +++ b/veilid-core/Cargo.toml @@ -104,9 +104,9 @@ serde_cbor = { version = "^0", default-features = false, features = ["alloc"] } serde_json = { version = "^1", default-features = false, features = ["alloc"] } getrandom = { version = "^0", features = ["js"] } ws_stream_wasm = "^0" -async_executors = { version = "^0", default-features = false, features = [ "bindgen" ]} +async_executors = { version = "^0", default-features = false, features = [ "bindgen", "timer" ]} async-lock = "^2" -send_wrapper = "^0" +send_wrapper = { version = "^0", features = ["futures"] } wasm-logger = "^0" tracing-wasm = "^0" diff --git a/veilid-core/src/api_tracing_layer.rs b/veilid-core/src/api_tracing_layer.rs index 911925c3..c2d6d3a5 100644 --- a/veilid-core/src/api_tracing_layer.rs +++ b/veilid-core/src/api_tracing_layer.rs @@ -5,18 +5,8 @@ use core::fmt::Write; use once_cell::sync::OnceCell; use tracing_subscriber::*; -cfg_if! { - if #[cfg(target_arch = "wasm32")] { - use send_wrapper::*; - - struct ApiLoggerInner { - update_callback: SendWrapper, - } - } else { - struct ApiLoggerInner { - update_callback: UpdateCallback, - } - } +struct ApiLoggerInner { + update_callback: UpdateCallback, } #[derive(Clone)] @@ -28,17 +18,7 @@ static API_LOGGER: OnceCell = OnceCell::new(); impl ApiTracingLayer { fn new_inner(update_callback: UpdateCallback) -> ApiLoggerInner { - cfg_if! { - if #[cfg(target_arch = "wasm32")] { - ApiLoggerInner { - update_callback: SendWrapper::new(update_callback), - } - } else { - ApiLoggerInner { - update_callback, - } - } - } + ApiLoggerInner { update_callback } } #[instrument(level = "debug", skip(update_callback))] diff --git a/veilid-core/src/callback_state_machine.rs b/veilid-core/src/callback_state_machine.rs index aa9ebe3a..64ab36c1 100644 --- a/veilid-core/src/callback_state_machine.rs +++ b/veilid-core/src/callback_state_machine.rs @@ -1,21 +1,9 @@ use crate::xx::*; pub use rust_fsm::*; -cfg_if! { - if #[cfg(target_arch = "wasm32")] { - pub type StateChangeCallback = Arc< - dyn Fn(::State, ::State) - + 'static, - >; - } else { - pub type StateChangeCallback = Arc< - dyn Fn(::State, ::State) - + Send - + Sync - + 'static, - >; - } -} +pub type StateChangeCallback = Arc< + dyn Fn(::State, ::State) + Send + Sync + 'static, +>; struct CallbackStateMachineInner where diff --git a/veilid-core/src/core_context.rs b/veilid-core/src/core_context.rs index bd408f7b..f1d34442 100644 --- a/veilid-core/src/core_context.rs +++ b/veilid-core/src/core_context.rs @@ -5,13 +5,7 @@ use crate::veilid_api::*; use crate::veilid_config::*; use crate::xx::*; -// cfg_if! { -// if #[cfg(target_arch = "wasm32")] { -// pub type UpdateCallback = Arc; -// } else { pub type UpdateCallback = Arc; -// } -// } struct ServicesContext { pub config: VeilidConfig, diff --git a/veilid-core/src/intf/native/system.rs b/veilid-core/src/intf/native/system.rs index e51ddb02..2963d4db 100644 --- a/veilid-core/src/intf/native/system.rs +++ b/veilid-core/src/intf/native/system.rs @@ -11,10 +11,10 @@ pub fn get_timestamp() -> u64 { } } -pub fn get_timestamp_string() -> String { - let dt = chrono::Utc::now(); - dt.time().format("%H:%M:%S.3f").to_string() -} +// pub fn get_timestamp_string() -> String { +// let dt = chrono::Utc::now(); +// dt.time().format("%H:%M:%S.3f").to_string() +// } pub fn random_bytes(dest: &mut [u8]) -> EyreResult<()> { let mut rng = rand::thread_rng(); @@ -83,26 +83,26 @@ where } } -pub fn spawn_with_local_set( - future: impl Future + Send + 'static, -) -> MustJoinHandle -where - Out: Send + 'static, -{ - cfg_if! { - if #[cfg(feature="rt-async-std")] { - spawn(future) - } else if #[cfg(feature="rt-tokio")] { - MustJoinHandle::new(tokio::task::spawn_blocking(move || { - let rt = tokio::runtime::Handle::current(); - rt.block_on(async { - let local = tokio::task::LocalSet::new(); - local.run_until(future).await - }) - })) - } - } -} +// pub fn spawn_with_local_set( +// future: impl Future + Send + 'static, +// ) -> MustJoinHandle +// where +// Out: Send + 'static, +// { +// cfg_if! { +// if #[cfg(feature="rt-async-std")] { +// spawn(future) +// } else if #[cfg(feature="rt-tokio")] { +// MustJoinHandle::new(tokio::task::spawn_blocking(move || { +// let rt = tokio::runtime::Handle::current(); +// rt.block_on(async { +// let local = tokio::task::LocalSet::new(); +// local.run_until(future).await +// }) +// })) +// } +// } +// } pub fn spawn_detached(future: impl Future + Send + 'static) where diff --git a/veilid-core/src/intf/wasm/system.rs b/veilid-core/src/intf/wasm/system.rs index a56c50ce..5c93d416 100644 --- a/veilid-core/src/intf/wasm/system.rs +++ b/veilid-core/src/intf/wasm/system.rs @@ -1,11 +1,11 @@ use super::utils; use crate::xx::*; use crate::*; -use async_executors::{Bindgen, LocalSpawnHandleExt /*, SpawnHandleExt*/}; +use async_executors::{Bindgen, LocalSpawnHandleExt, SpawnHandleExt, Timer}; use futures_util::future::{select, Either}; use js_sys::*; use wasm_bindgen_futures::*; -use web_sys::*; +//use web_sys::*; #[wasm_bindgen] extern "C" { @@ -26,17 +26,17 @@ pub fn get_timestamp() -> u64 { } } -pub fn get_timestamp_string() -> String { - let date = Date::new_0(); - let hours = Date::get_utc_hours(&date); - let minutes = Date::get_utc_minutes(&date); - let seconds = Date::get_utc_seconds(&date); - let milliseconds = Date::get_utc_milliseconds(&date); - format!( - "{:02}:{:02}:{:02}.{}", - hours, minutes, seconds, milliseconds - ) -} +// pub fn get_timestamp_string() -> String { +// let date = Date::new_0(); +// let hours = Date::get_utc_hours(&date); +// let minutes = Date::get_utc_minutes(&date); +// let seconds = Date::get_utc_seconds(&date); +// let milliseconds = Date::get_utc_milliseconds(&date); +// format!( +// "{:02}:{:02}:{:02}.{}", +// hours, minutes, seconds, milliseconds +// ) +// } pub fn random_bytes(dest: &mut [u8]) -> EyreResult<()> { let len = dest.len(); @@ -72,43 +72,21 @@ pub fn get_random_u64() -> u64 { } pub async fn sleep(millis: u32) { - if utils::is_browser() { - let wait_millis = if millis > u32::MAX { - i32::MAX - } else { - millis as i32 - }; - let promise = Promise::new(&mut |yes, _| { - let win = window().unwrap(); - win.set_timeout_with_callback_and_timeout_and_arguments_0(&yes, wait_millis) - .unwrap(); - }); - - JsFuture::from(promise).await.unwrap(); - } else if utils::is_nodejs() { - let promise = Promise::new(&mut |yes, _| { - nodejs_global_set_timeout_with_callback_and_timeout_and_arguments_0(&yes, millis) - .unwrap(); - }); - - JsFuture::from(promise).await.unwrap(); - } else { - panic!("WASM requires browser or nodejs environment"); - } + Bindgen.sleep(Duration::from_millis(millis.into())).await } pub fn system_boxed<'a, Out>( - future: impl Future + 'a, + future: impl Future + Send + 'a, ) -> SystemPinBoxFutureLifetime<'a, Out> { Box::pin(future) } -pub fn spawn(future: impl Future + 'static) -> MustJoinHandle +pub fn spawn(future: impl Future + Send + 'static) -> MustJoinHandle where Out: Send + 'static, { MustJoinHandle::new(Bindgen - .spawn_handle_local(future) + .spawn_handle(future) .expect("wasm-bindgen-futures spawn should never error out")) } @@ -121,16 +99,16 @@ where .expect("wasm-bindgen-futures spawn_local should never error out")) } -pub fn spawn_with_local_set( - future: impl Future + 'static, -) -> MustJoinHandle -where - Out: Send + 'static, -{ - spawn(future) -} +// pub fn spawn_with_local_set( +// future: impl Future + Send + 'static, +// ) -> MustJoinHandle +// where +// Out: Send + 'static, +// { +// spawn(future) +// } -pub fn spawn_detached(future: impl Future + 'static) +pub fn spawn_detached(future: impl Future + Send + 'static) where Out: Send + 'static, { @@ -142,13 +120,13 @@ where pub fn interval(freq_ms: u32, callback: F) -> SystemPinBoxFuture<()> where - F: Fn() -> FUT + 'static, - FUT: Future, + F: Fn() -> FUT + Send + Sync + 'static, + FUT: Future + Send, { let e = Eventual::new(); let ie = e.clone(); - let jh = spawn_local(Box::pin(async move { + let jh = spawn(Box::pin(async move { while timeout(freq_ms, ie.instance_clone(())).await.is_err() { callback().await; } diff --git a/veilid-core/src/intf/wasm/table_store.rs b/veilid-core/src/intf/wasm/table_store.rs index f83e38ef..2e0aaa03 100644 --- a/veilid-core/src/intf/wasm/table_store.rs +++ b/veilid-core/src/intf/wasm/table_store.rs @@ -13,6 +13,7 @@ struct TableStoreInner { pub struct TableStore { config: VeilidConfig, inner: Arc>, + async_lock: Arc>, } impl TableStore { @@ -25,14 +26,17 @@ impl TableStore { Self { config, inner: Arc::new(Mutex::new(Self::new_inner())), + async_lock: Arc::new(AsyncMutex::new(())), } } pub async fn init(&self) -> EyreResult<()> { + let _async_guard = self.async_lock.lock().await; Ok(()) } pub async fn terminate(&self) { + let _async_guard = self.async_lock.lock().await; assert!( self.inner.lock().opened.len() == 0, "all open databases should have been closed" @@ -66,18 +70,21 @@ impl TableStore { } pub async fn open(&self, name: &str, column_count: u32) -> EyreResult { + let _async_guard = self.async_lock.lock().await; let table_name = self.get_table_name(name)?; - let mut inner = self.inner.lock(); - if let Some(table_db_weak_inner) = inner.opened.get(&table_name) { - match TableDB::try_new_from_weak_inner(table_db_weak_inner.clone()) { - Some(tdb) => { - return Ok(tdb); - } - None => { - inner.opened.remove(&table_name); - } - }; + { + let mut inner = self.inner.lock(); + if let Some(table_db_weak_inner) = inner.opened.get(&table_name) { + match TableDB::try_new_from_weak_inner(table_db_weak_inner.clone()) { + Some(tdb) => { + return Ok(tdb); + } + None => { + inner.opened.remove(&table_name); + } + }; + } } let db = Database::open(table_name.clone(), column_count) .await @@ -86,22 +93,28 @@ impl TableStore { let table_db = TableDB::new(table_name.clone(), self.clone(), db); - inner.opened.insert(table_name, table_db.weak_inner()); + { + let mut inner = self.inner.lock(); + inner.opened.insert(table_name, table_db.weak_inner()); + } Ok(table_db) } pub async fn delete(&self, name: &str) -> EyreResult { + let _async_guard = self.async_lock.lock().await; trace!("TableStore::delete {}", name); let table_name = self.get_table_name(name)?; - let inner = self.inner.lock(); - if inner.opened.contains_key(&table_name) { - trace!( - "TableStore::delete {}: Not deleting, still open.", - table_name - ); - bail!("Not deleting table that is still opened"); + { + let inner = self.inner.lock(); + if inner.opened.contains_key(&table_name) { + trace!( + "TableStore::delete {}: Not deleting, still open.", + table_name + ); + bail!("Not deleting table that is still opened"); + } } if utils::is_nodejs() { diff --git a/veilid-core/src/intf/wasm/utils/mod.rs b/veilid-core/src/intf/wasm/utils/mod.rs index dc8be425..5a4f853f 100644 --- a/veilid-core/src/intf/wasm/utils/mod.rs +++ b/veilid-core/src/intf/wasm/utils/mod.rs @@ -44,39 +44,39 @@ pub fn is_browser() -> bool { res } -pub fn is_browser_https() -> bool { - static CACHE: AtomicI8 = AtomicI8::new(-1); - let cache = CACHE.load(Ordering::Relaxed); - if cache != -1 { - return cache != 0; - } +// pub fn is_browser_https() -> bool { +// static CACHE: AtomicI8 = AtomicI8::new(-1); +// let cache = CACHE.load(Ordering::Relaxed); +// if cache != -1 { +// return cache != 0; +// } - let res = js_sys::eval("window.location.protocol === 'https'") - .map(|res| res.is_truthy()) - .unwrap_or_default(); +// let res = js_sys::eval("window.location.protocol === 'https'") +// .map(|res| res.is_truthy()) +// .unwrap_or_default(); - CACHE.store(res as i8, Ordering::Relaxed); +// CACHE.store(res as i8, Ordering::Relaxed); - res -} +// res +// } -pub fn node_require(module: &str) -> JsValue { - if !is_nodejs() { - return JsValue::UNDEFINED; - } +// pub fn node_require(module: &str) -> JsValue { +// if !is_nodejs() { +// return JsValue::UNDEFINED; +// } - let mut home = env!("CARGO_MANIFEST_DIR"); - if home.len() == 0 { - home = "."; - } +// let mut home = env!("CARGO_MANIFEST_DIR"); +// if home.len() == 0 { +// home = "."; +// } - match js_sys::eval(format!("require(\"{}/{}\")", home, module).as_str()) { - Ok(v) => v, - Err(e) => { - panic!("node_require failed: {:?}", e); - } - } -} +// match js_sys::eval(format!("require(\"{}/{}\")", home, module).as_str()) { +// Ok(v) => v, +// Err(e) => { +// panic!("node_require failed: {:?}", e); +// } +// } +// } #[derive(ThisError, Debug, Clone, Eq, PartialEq)] #[error("JsValue error")] diff --git a/veilid-core/src/network_manager/connection_manager.rs b/veilid-core/src/network_manager/connection_manager.rs index cd0dcfe2..fdd32942 100644 --- a/veilid-core/src/network_manager/connection_manager.rs +++ b/veilid-core/src/network_manager/connection_manager.rs @@ -333,6 +333,7 @@ impl ConnectionManager { // Called by low-level network when any connection-oriented protocol connection appears // either from incoming connections. + #[cfg_attr(target_os = "wasm32", allow(dead_code))] pub(super) async fn on_accepted_protocol_network_connection( &self, conn: ProtocolNetworkConnection, diff --git a/veilid-core/src/network_manager/wasm/protocol/mod.rs b/veilid-core/src/network_manager/wasm/protocol/mod.rs index fe88d2e5..92734266 100644 --- a/veilid-core/src/network_manager/wasm/protocol/mod.rs +++ b/veilid-core/src/network_manager/wasm/protocol/mod.rs @@ -7,6 +7,7 @@ use std::io; #[derive(Debug)] pub enum ProtocolNetworkConnection { + #[allow(dead_code)] Dummy(DummyNetworkConnection), Ws(ws::WebsocketNetworkConnection), //WebRTC(wrtc::WebRTCNetworkConnection), diff --git a/veilid-core/src/network_manager/wasm/protocol/ws.rs b/veilid-core/src/network_manager/wasm/protocol/ws.rs index 2699f8fc..db23d0b6 100644 --- a/veilid-core/src/network_manager/wasm/protocol/ws.rs +++ b/veilid-core/src/network_manager/wasm/protocol/ws.rs @@ -2,9 +2,10 @@ use super::*; use futures_util::{SinkExt, StreamExt}; use std::io; use ws_stream_wasm::*; +use send_wrapper::*; struct WebsocketNetworkConnectionInner { - ws_meta: WsMeta, + _ws_meta: WsMeta, ws_stream: CloneStream, } @@ -29,7 +30,7 @@ impl WebsocketNetworkConnection { Self { descriptor, inner: Arc::new(WebsocketNetworkConnectionInner { - ws_meta, + _ws_meta: ws_meta, ws_stream: CloneStream::new(ws_stream), }), } @@ -59,7 +60,7 @@ impl WebsocketNetworkConnection { #[instrument(level = "trace", err, skip(self), fields(ret.len))] pub async fn recv(&self) -> io::Result> { - let out = match self.inner.ws_stream.clone().next().await { + let out = match SendWrapper::new(self.inner.ws_stream.clone().next()).await { Some(WsMessage::Binary(v)) => v, Some(x) => { bail_io_error_other!("Unexpected WS message type"); @@ -100,7 +101,8 @@ impl WebsocketProtocolHandler { bail_io_error_other!("invalid websocket url scheme"); } - let (wsmeta, wsio) = WsMeta::connect(request, None).await.map_err(to_io)?; + let fut = spawn_local(WsMeta::connect(request, None)); + let (wsmeta, wsio) = fut.await.map_err(to_io)?; // Make our connection descriptor Ok(ProtocolNetworkConnection::Ws( diff --git a/veilid-core/src/receipt_manager.rs b/veilid-core/src/receipt_manager.rs index f7ad684f..d046b7f4 100644 --- a/veilid-core/src/receipt_manager.rs +++ b/veilid-core/src/receipt_manager.rs @@ -15,57 +15,28 @@ pub enum ReceiptEvent { Cancelled, } -cfg_if! { - if #[cfg(target_arch = "wasm32")] { - pub trait ReceiptCallback: 'static { - fn call( - &self, - event: ReceiptEvent, - receipt: Receipt, - returns_so_far: u32, - expected_returns: u32, - ) -> SystemPinBoxFuture<()>; - } - impl ReceiptCallback for T - where - T: Fn(ReceiptEvent, Receipt, u32, u32) -> F + 'static, - F: Future + 'static, - { - fn call( - &self, - event: ReceiptEvent, - receipt: Receipt, - returns_so_far: u32, - expected_returns: u32, - ) -> SystemPinBoxFuture<()> { - Box::pin(self(event, receipt, returns_so_far, expected_returns)) - } - } - } else { - pub trait ReceiptCallback: Send + 'static { - fn call( - &self, - event: ReceiptEvent, - receipt: Receipt, - returns_so_far: u32, - expected_returns: u32, - ) -> SystemPinBoxFuture<()>; - } - impl ReceiptCallback for T - where - T: Fn(ReceiptEvent, Receipt, u32, u32) -> F + Send + 'static, - F: Future + Send + 'static - { - fn call( - &self, - event: ReceiptEvent, - receipt: Receipt, - returns_so_far: u32, - expected_returns: u32, - ) -> SystemPinBoxFuture<()> { - Box::pin(self(event, receipt, returns_so_far, expected_returns)) - } - } +pub trait ReceiptCallback: Send + 'static { + fn call( + &self, + event: ReceiptEvent, + receipt: Receipt, + returns_so_far: u32, + expected_returns: u32, + ) -> SystemPinBoxFuture<()>; +} +impl ReceiptCallback for T +where + T: Fn(ReceiptEvent, Receipt, u32, u32) -> F + Send + 'static, + F: Future + Send + 'static, +{ + fn call( + &self, + event: ReceiptEvent, + receipt: Receipt, + returns_so_far: u32, + expected_returns: u32, + ) -> SystemPinBoxFuture<()> { + Box::pin(self(event, receipt, returns_so_far, expected_returns)) } } diff --git a/veilid-core/src/veilid_api/mod.rs b/veilid-core/src/veilid_api/mod.rs index 52db566f..c911b294 100644 --- a/veilid-core/src/veilid_api/mod.rs +++ b/veilid-core/src/veilid_api/mod.rs @@ -1593,15 +1593,9 @@ pub struct PeerStats { pub status: Option, // Last known node status } -cfg_if! { - if #[cfg(target_arch = "wasm32")] { - pub type ValueChangeCallback = - Arc) -> SystemPinBoxFuture<()> + 'static>; - } else { - pub type ValueChangeCallback = - Arc) -> SystemPinBoxFuture<()> + Send + Sync + 'static>; - } -} +pub type ValueChangeCallback = + Arc) -> SystemPinBoxFuture<()> + Send + Sync + 'static>; + ///////////////////////////////////////////////////////////////////////////////////////////////////// #[derive(Clone, Debug, Serialize, Deserialize)] diff --git a/veilid-core/src/veilid_config.rs b/veilid-core/src/veilid_config.rs index 304cb51c..ac8df820 100644 --- a/veilid-core/src/veilid_config.rs +++ b/veilid-core/src/veilid_config.rs @@ -3,16 +3,8 @@ use crate::*; use serde::*; //////////////////////////////////////////////////////////////////////////////////////////////// -cfg_if! { - if #[cfg(target_arch = "wasm32")] { - pub type ConfigCallbackReturn = Result, VeilidAPIError>; - pub type ConfigCallback = Arc ConfigCallbackReturn>; - - } else { - pub type ConfigCallbackReturn = Result, VeilidAPIError>; - pub type ConfigCallback = Arc ConfigCallbackReturn + Send + Sync>; - } -} +pub type ConfigCallbackReturn = Result, VeilidAPIError>; +pub type ConfigCallback = Arc ConfigCallbackReturn + Send + Sync>; #[derive(Default, Debug, Clone, Serialize, Deserialize)] pub struct VeilidConfigHTTPS { diff --git a/veilid-core/src/xx/mod.rs b/veilid-core/src/xx/mod.rs index 434a926a..a461c74b 100644 --- a/veilid-core/src/xx/mod.rs +++ b/veilid-core/src/xx/mod.rs @@ -69,8 +69,8 @@ cfg_if! { pub use async_lock::Mutex as AsyncMutex; pub use async_lock::MutexGuard as AsyncMutexGuard; pub use no_std_net::{ SocketAddr, SocketAddrV4, SocketAddrV6, ToSocketAddrs, IpAddr, Ipv4Addr, Ipv6Addr }; - pub type SystemPinBoxFuture = PinBox + 'static>; - pub type SystemPinBoxFutureLifetime<'a, T> = PinBox + 'a>; + pub type SystemPinBoxFuture = PinBox + Send + 'static>; + pub type SystemPinBoxFutureLifetime<'a, T> = PinBox + Send + 'a>; pub use async_executors::JoinHandle as LowLevelJoinHandle; } else { pub use std::string::String; diff --git a/veilid-core/src/xx/must_join_single_future.rs b/veilid-core/src/xx/must_join_single_future.rs index 4687fd24..8830c1d3 100644 --- a/veilid-core/src/xx/must_join_single_future.rs +++ b/veilid-core/src/xx/must_join_single_future.rs @@ -125,88 +125,81 @@ where } // Possibly spawn the future possibly returning the value of the last execution - cfg_if! { - if #[cfg(target_arch = "wasm32")] { - pub async fn single_spawn( - &self, - future: impl Future + 'static, - ) -> Result<(Option,bool), ()> { - let mut out: Option = None; + pub async fn single_spawn_local( + &self, + future: impl Future + 'static, + ) -> Result<(Option, bool), ()> { + let mut out: Option = None; - // See if we have a result we can return - let maybe_jh = match self.try_lock() { - Ok(v) => v, - Err(_) => { - // If we are already polling somewhere else, don't hand back a result - return Err(()); - } - }; - let mut run = true; + // See if we have a result we can return + let maybe_jh = match self.try_lock() { + Ok(v) => v, + Err(_) => { + // If we are already polling somewhere else, don't hand back a result + return Err(()); + } + }; + let mut run = true; - if maybe_jh.is_some() { - let mut jh = maybe_jh.unwrap(); + if maybe_jh.is_some() { + let mut jh = maybe_jh.unwrap(); - // See if we finished, if so, return the value of the last execution - if let Poll::Ready(r) = poll!(&mut jh) { - out = Some(r); - // Task finished, unlock with a new task - } else { - // Still running, don't run again, unlock with the current join handle - run = false; - self.unlock(Some(jh)); - } - } - - // Run if we should do that - if run { - self.unlock(Some(intf::spawn_local(future))); - } - - // Return the prior result if we have one - Ok((out, run)) + // See if we finished, if so, return the value of the last execution + if let Poll::Ready(r) = poll!(&mut jh) { + out = Some(r); + // Task finished, unlock with a new task + } else { + // Still running, don't run again, unlock with the current join handle + run = false; + self.unlock(Some(jh)); } } + + // Run if we should do that + if run { + self.unlock(Some(intf::spawn_local(future))); + } + + // Return the prior result if we have one + Ok((out, run)) } } -cfg_if! { - if #[cfg(not(target_arch = "wasm32"))] { - impl MustJoinSingleFuture - where - T: 'static + Send, - { - pub async fn single_spawn( - &self, - future: impl Future + Send + 'static, - ) -> Result<(Option, bool), ()> { - let mut out: Option = None; - // See if we have a result we can return - let maybe_jh = match self.try_lock() { - Ok(v) => v, - Err(_) => { - // If we are already polling somewhere else, don't hand back a result - return Err(()); - } - }; - let mut run = true; - if maybe_jh.is_some() { - let mut jh = maybe_jh.unwrap(); - // See if we finished, if so, return the value of the last execution - if let Poll::Ready(r) = poll!(&mut jh) { - out = Some(r); - // Task finished, unlock with a new task - } else { - // Still running, don't run again, unlock with the current join handle - run = false; - self.unlock(Some(jh)); - } - } - // Run if we should do that - if run { - self.unlock(Some(intf::spawn(future))); - } - // Return the prior result if we have one - Ok((out, run)) + +impl MustJoinSingleFuture +where + T: 'static + Send, +{ + pub async fn single_spawn( + &self, + future: impl Future + Send + 'static, + ) -> Result<(Option, bool), ()> { + let mut out: Option = None; + // See if we have a result we can return + let maybe_jh = match self.try_lock() { + Ok(v) => v, + Err(_) => { + // If we are already polling somewhere else, don't hand back a result + return Err(()); + } + }; + let mut run = true; + if maybe_jh.is_some() { + let mut jh = maybe_jh.unwrap(); + // See if we finished, if so, return the value of the last execution + if let Poll::Ready(r) = poll!(&mut jh) { + out = Some(r); + // Task finished, unlock with a new task + } else { + // Still running, don't run again, unlock with the current join handle + run = false; + self.unlock(Some(jh)); } } + // Run if we should do that + if run { + self.unlock(Some(intf::spawn(future))); + } + // Return the prior result if we have one + Ok((out, run)) } } diff --git a/veilid-core/src/xx/tick_task.rs b/veilid-core/src/xx/tick_task.rs index 1b8ed84d..9ac821d1 100644 --- a/veilid-core/src/xx/tick_task.rs +++ b/veilid-core/src/xx/tick_task.rs @@ -3,23 +3,13 @@ use crate::*; use core::sync::atomic::{AtomicU64, Ordering}; use once_cell::sync::OnceCell; -cfg_if! { - if #[cfg(target_arch = "wasm32")] { - type TickTaskRoutine = - dyn Fn(StopToken, u64, u64) -> PinBoxFuture> + 'static; - } else { - type TickTaskRoutine = - dyn Fn(StopToken, u64, u64) -> SendPinBoxFuture> + Send + Sync + 'static; - } -} +type TickTaskRoutine = + dyn Fn(StopToken, u64, u64) -> SendPinBoxFuture> + Send + Sync + 'static; /// Runs a single-future background processing task, attempting to run it once every 'tick period' microseconds. /// If the prior tick is still running, it will allow it to finish, and do another tick when the timer comes around again. /// One should attempt to make tasks short-lived things that run in less than the tick period if you want things to happen with regular periodicity. -pub struct TickTask< - #[cfg(target_arch = "wasm32")] E: 'static, - #[cfg(not(target_arch = "wasm32"))] E: Send + 'static, -> { +pub struct TickTask { last_timestamp_us: AtomicU64, tick_period_us: u64, routine: OnceCell>>, @@ -27,11 +17,7 @@ pub struct TickTask< single_future: MustJoinSingleFuture>, } -impl< - #[cfg(target_arch = "wasm32")] E: 'static, - #[cfg(not(target_arch = "wasm32"))] E: Send + 'static, - > TickTask -{ +impl TickTask { pub fn new_us(tick_period_us: u64) -> Self { Self { last_timestamp_us: AtomicU64::new(0), diff --git a/veilid-core/src/xx/tools.rs b/veilid-core/src/xx/tools.rs index c1f12266..8ad0cba2 100644 --- a/veilid-core/src/xx/tools.rs +++ b/veilid-core/src/xx/tools.rs @@ -167,7 +167,7 @@ pub fn listen_address_to_socket_addrs(listen_address: &str) -> EyreResult