veilid/veilid-tools/src/tick_task.rs

146 lines
5.7 KiB
Rust
Raw Normal View History

2021-11-22 16:28:30 +00:00
use super::*;
2022-11-27 02:37:23 +00:00
2021-11-22 16:28:30 +00:00
use core::sync::atomic::{AtomicU64, Ordering};
use once_cell::sync::OnceCell;
2022-07-12 12:02:22 +00:00
type TickTaskRoutine<E> =
dyn Fn(StopToken, u64, u64) -> SendPinBoxFuture<Result<(), E>> + Send + Sync + 'static;
2021-11-22 16:28:30 +00:00
2022-01-04 04:58:26 +00:00
/// Runs a single-future background processing task, attempting to run it once every 'tick period' microseconds.
/// If the prior tick is still running, it will allow it to finish, and do another tick when the timer comes around again.
/// One should attempt to make tasks short-lived things that run in less than the tick period if you want things to happen with regular periodicity.
2022-07-12 12:02:22 +00:00
pub struct TickTask<E: Send + 'static> {
2021-11-22 16:28:30 +00:00
last_timestamp_us: AtomicU64,
tick_period_us: u64,
2022-07-10 21:36:50 +00:00
routine: OnceCell<Box<TickTaskRoutine<E>>>,
2022-06-13 00:58:02 +00:00
stop_source: AsyncMutex<Option<StopSource>>,
2022-07-10 21:36:50 +00:00
single_future: MustJoinSingleFuture<Result<(), E>>,
2022-10-05 00:09:32 +00:00
running: Arc<AtomicBool>,
2021-11-22 16:28:30 +00:00
}
2022-07-12 12:02:22 +00:00
impl<E: Send + 'static> TickTask<E> {
2021-11-22 16:28:30 +00:00
pub fn new_us(tick_period_us: u64) -> Self {
Self {
last_timestamp_us: AtomicU64::new(0),
2021-11-27 17:44:21 +00:00
tick_period_us,
2021-11-22 16:28:30 +00:00
routine: OnceCell::new(),
2022-06-13 00:58:02 +00:00
stop_source: AsyncMutex::new(None),
single_future: MustJoinSingleFuture::new(),
2022-10-05 00:09:32 +00:00
running: Arc::new(AtomicBool::new(false)),
2021-11-22 16:28:30 +00:00
}
}
2022-01-27 14:53:01 +00:00
pub fn new_ms(tick_period_ms: u32) -> Self {
Self {
last_timestamp_us: AtomicU64::new(0),
tick_period_us: (tick_period_ms as u64) * 1000u64,
routine: OnceCell::new(),
2022-06-13 00:58:02 +00:00
stop_source: AsyncMutex::new(None),
single_future: MustJoinSingleFuture::new(),
2022-10-05 00:09:32 +00:00
running: Arc::new(AtomicBool::new(false)),
2022-01-27 14:53:01 +00:00
}
}
2021-11-22 16:28:30 +00:00
pub fn new(tick_period_sec: u32) -> Self {
Self {
last_timestamp_us: AtomicU64::new(0),
tick_period_us: (tick_period_sec as u64) * 1000000u64,
routine: OnceCell::new(),
2022-06-13 00:58:02 +00:00
stop_source: AsyncMutex::new(None),
single_future: MustJoinSingleFuture::new(),
2022-10-05 00:09:32 +00:00
running: Arc::new(AtomicBool::new(false)),
2021-11-22 16:28:30 +00:00
}
}
2022-07-10 21:36:50 +00:00
pub fn set_routine(
&self,
routine: impl Fn(StopToken, u64, u64) -> SendPinBoxFuture<Result<(), E>> + Send + Sync + 'static,
) {
self.routine.set(Box::new(routine)).map_err(drop).unwrap();
2021-11-22 16:28:30 +00:00
}
2022-10-05 00:09:32 +00:00
pub fn is_running(&self) -> bool {
self.running.load(core::sync::atomic::Ordering::Relaxed)
}
2022-07-10 21:36:50 +00:00
pub async fn stop(&self) -> Result<(), E> {
2022-06-13 00:58:02 +00:00
// drop the stop source if we have one
let opt_stop_source = &mut *self.stop_source.lock().await;
if opt_stop_source.is_none() {
// already stopped, just return
2022-06-15 18:05:04 +00:00
trace!("tick task already stopped");
2022-06-13 00:58:02 +00:00
return Ok(());
}
2022-06-15 18:05:04 +00:00
drop(opt_stop_source.take());
2022-06-13 00:58:02 +00:00
// wait for completion of the tick task
2022-06-15 18:05:04 +00:00
trace!("stopping single future");
2022-06-13 00:58:02 +00:00
match self.single_future.join().await {
2021-11-22 16:28:30 +00:00
Ok(Some(Err(err))) => Err(err),
_ => Ok(()),
}
}
2022-07-10 21:36:50 +00:00
pub async fn tick(&self) -> Result<(), E> {
2022-11-27 02:37:23 +00:00
let now = get_timestamp();
2021-11-22 16:28:30 +00:00
let last_timestamp_us = self.last_timestamp_us.load(Ordering::Acquire);
2022-11-22 03:50:42 +00:00
if last_timestamp_us != 0u64 && now.saturating_sub(last_timestamp_us) < self.tick_period_us
{
2022-06-15 18:05:04 +00:00
// It's not time yet
return Ok(());
}
// Lock the stop source, tells us if we have ever started this future
let opt_stop_source = &mut *self.stop_source.lock().await;
if opt_stop_source.is_some() {
// See if the previous execution finished with an error
match self.single_future.check().await {
Ok(Some(Err(e))) => {
// We have an error result, which means the singlefuture ran but we need to propagate the error
return Err(e);
}
Ok(Some(Ok(()))) => {
// We have an ok result, which means the singlefuture ran, and we should run it again this tick
2021-11-22 16:28:30 +00:00
}
2022-06-15 18:05:04 +00:00
Ok(None) => {
// No prior result to return which means things are still running
// We can just return now, since the singlefuture will not run a second time
return Ok(());
2021-11-22 16:28:30 +00:00
}
2022-06-15 18:05:04 +00:00
Err(()) => {
// If we get this, it's because we are joining the singlefuture already
// Don't bother running but this is not an error in this case
return Ok(());
}
};
}
// Run the singlefuture
let stop_source = StopSource::new();
2022-10-05 00:09:32 +00:00
let stop_token = stop_source.token();
let running = self.running.clone();
let routine = self.routine.get().unwrap()(stop_token, last_timestamp_us, now);
let wrapped_routine = Box::pin(async move {
running.store(true, core::sync::atomic::Ordering::Relaxed);
let out = routine.await;
running.store(false, core::sync::atomic::Ordering::Relaxed);
out
});
match self.single_future.single_spawn(wrapped_routine).await {
2022-06-15 18:05:04 +00:00
// We should have already consumed the result of the last run, or there was none
// and we should definitely have run, because the prior 'check()' operation
// should have ensured the singlefuture was ready to run
Ok((None, true)) => {
// Set new timer
self.last_timestamp_us.store(now, Ordering::Release);
// Save new stopper
*opt_stop_source = Some(stop_source);
Ok(())
}
// All other conditions should not be reachable
_ => {
unreachable!();
2021-11-22 16:28:30 +00:00
}
}
}
}