use super::*; use core::task::Poll; use futures_util::poll; #[derive(Debug)] struct MustJoinSingleFutureInner where T: 'static, { locked: bool, join_handle: Option>, } /// Spawns a single background processing task idempotently, possibly returning the return value of the previously executed background task /// This does not queue, just ensures that no more than a single copy of the task is running at a time, but allowing tasks to be retriggered #[derive(Debug, Clone)] pub struct MustJoinSingleFuture where T: 'static, { inner: Arc>>, } impl Default for MustJoinSingleFuture where T: 'static, { fn default() -> Self { Self::new() } } impl MustJoinSingleFuture where T: 'static, { pub fn new() -> Self { Self { inner: Arc::new(Mutex::new(MustJoinSingleFutureInner { locked: false, join_handle: None, })), } } fn try_lock(&self) -> Result>, ()> { let mut inner = self.inner.lock(); if inner.locked { // If already locked error out return Err(()); } inner.locked = true; // If we got the lock, return what we have for a join handle if anything Ok(inner.join_handle.take()) } fn unlock(&self, jh: Option>) { let mut inner = self.inner.lock(); assert!(inner.locked); assert!(inner.join_handle.is_none()); inner.locked = false; inner.join_handle = jh; } /// Check the result and take it if there is one pub async fn check(&self) -> Result, ()> { let mut out: Option = None; // See if we have a result we can return let maybe_jh = match self.try_lock() { Ok(v) => v, Err(_) => { // If we are already polling somewhere else, don't hand back a result return Err(()); } }; if maybe_jh.is_some() { let mut jh = maybe_jh.unwrap(); // See if we finished, if so, return the value of the last execution if let Poll::Ready(r) = poll!(&mut jh) { out = Some(r); // Task finished, unlock with nothing self.unlock(None); } else { // Still running put the join handle back so we can check on it later self.unlock(Some(jh)); } } else { // No task, unlock with nothing self.unlock(None); } // Return the prior result if we have one Ok(out) } /// Wait for the result and take it pub async fn join(&self) -> Result, ()> { let mut out: Option = None; // See if we have a result we can return let maybe_jh = match self.try_lock() { Ok(v) => v, Err(_) => { // If we are already polling somewhere else, // that's an error because you can only join // these things once return Err(()); } }; if maybe_jh.is_some() { let jh = maybe_jh.unwrap(); // Wait for return value of the last execution out = Some(jh.await); // Task finished, unlock with nothing } else { // No task, unlock with nothing } self.unlock(None); // Return the prior result if we have one Ok(out) } // Possibly spawn the future possibly returning the value of the last execution pub async fn single_spawn_local( &self, future: impl Future + 'static, ) -> Result<(Option, bool), ()> { let mut out: Option = None; // See if we have a result we can return let maybe_jh = match self.try_lock() { Ok(v) => v, Err(_) => { // If we are already polling somewhere else, don't hand back a result return Err(()); } }; let mut run = true; if maybe_jh.is_some() { let mut jh = maybe_jh.unwrap(); // See if we finished, if so, return the value of the last execution if let Poll::Ready(r) = poll!(&mut jh) { out = Some(r); // Task finished, unlock with a new task } else { // Still running, don't run again, unlock with the current join handle run = false; self.unlock(Some(jh)); } } // Run if we should do that if run { self.unlock(Some(spawn_local(future))); } // Return the prior result if we have one Ok((out, run)) } } impl MustJoinSingleFuture where T: 'static + Send, { pub async fn single_spawn( &self, future: impl Future + Send + 'static, ) -> Result<(Option, bool), ()> { let mut out: Option = None; // See if we have a result we can return let maybe_jh = match self.try_lock() { Ok(v) => v, Err(_) => { // If we are already polling somewhere else, don't hand back a result return Err(()); } }; let mut run = true; if maybe_jh.is_some() { let mut jh = maybe_jh.unwrap(); // See if we finished, if so, return the value of the last execution if let Poll::Ready(r) = poll!(&mut jh) { out = Some(r); // Task finished, unlock with a new task } else { // Still running, don't run again, unlock with the current join handle run = false; self.unlock(Some(jh)); } } // Run if we should do that if run { self.unlock(Some(spawn(future))); } // Return the prior result if we have one Ok((out, run)) } }