veilid/veilid-tools/src/must_join_handle.rs

108 lines
3.5 KiB
Rust
Raw Normal View History

2022-06-28 03:46:29 +00:00
use super::*;
2022-11-27 02:37:23 +00:00
2022-06-13 00:58:02 +00:00
use core::task::{Context, Poll};
#[derive(Debug)]
pub struct MustJoinHandle<T> {
2022-06-28 03:46:29 +00:00
join_handle: Option<LowLevelJoinHandle<T>>,
2022-06-15 18:05:04 +00:00
completed: bool,
2022-06-13 00:58:02 +00:00
}
impl<T> MustJoinHandle<T> {
2022-06-28 03:46:29 +00:00
pub fn new(join_handle: LowLevelJoinHandle<T>) -> Self {
2022-06-13 00:58:02 +00:00
Self {
2022-06-28 03:46:29 +00:00
join_handle: Some(join_handle),
2022-06-15 18:05:04 +00:00
completed: false,
2022-06-13 00:58:02 +00:00
}
}
2022-06-28 03:46:29 +00:00
2023-06-07 21:39:10 +00:00
pub fn detach(mut self) {
cfg_if! {
if #[cfg(feature="rt-async-std")] {
self.join_handle = None;
} else if #[cfg(feature="rt-tokio")] {
self.join_handle = None;
} else if #[cfg(target_arch = "wasm32")] {
2023-07-02 21:23:04 +00:00
if let Some(jh) = self.join_handle.take() {
jh.detach();
}
2023-06-07 21:39:10 +00:00
} else {
compile_error!("needs executor implementation")
}
}
self.completed = true;
}
2022-06-28 04:10:21 +00:00
#[allow(unused_mut)]
2022-06-28 03:46:29 +00:00
pub async fn abort(mut self) {
if !self.completed {
cfg_if! {
if #[cfg(feature="rt-async-std")] {
if let Some(jh) = self.join_handle.take() {
jh.cancel().await;
self.completed = true;
}
} else if #[cfg(feature="rt-tokio")] {
if let Some(jh) = self.join_handle.take() {
jh.abort();
let _ = jh.await;
self.completed = true;
}
2022-06-28 04:10:21 +00:00
} else if #[cfg(target_arch = "wasm32")] {
drop(self.join_handle.take());
self.completed = true;
} else {
compile_error!("needs executor implementation")
2022-06-28 03:46:29 +00:00
}
2022-06-28 04:10:21 +00:00
2022-06-28 03:46:29 +00:00
}
}
}
2022-06-13 00:58:02 +00:00
}
impl<T> Drop for MustJoinHandle<T> {
fn drop(&mut self) {
// panic if we haven't completed
2022-06-15 18:05:04 +00:00
if !self.completed {
2022-06-13 00:58:02 +00:00
panic!("MustJoinHandle was not completed upon drop. Add cooperative cancellation where appropriate to ensure this is completed before drop.")
}
}
}
impl<T: 'static> Future for MustJoinHandle<T> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2022-06-28 03:46:29 +00:00
match Pin::new(self.join_handle.as_mut().unwrap()).poll(cx) {
2022-06-13 00:58:02 +00:00
Poll::Ready(t) => {
2022-06-29 14:13:49 +00:00
if self.completed {
panic!("should not poll completed join handle");
}
2022-06-15 18:05:04 +00:00
self.completed = true;
2022-06-28 03:46:29 +00:00
cfg_if! {
if #[cfg(feature="rt-async-std")] {
Poll::Ready(t)
} else if #[cfg(feature="rt-tokio")] {
2022-06-29 14:13:49 +00:00
match t {
Ok(t) => Poll::Ready(t),
Err(e) => {
if e.is_panic() {
// Resume the panic on the main task
std::panic::resume_unwind(e.into_panic());
} else {
panic!("join error was not a panic, should not poll after abort");
}
}
}
2022-11-27 14:00:20 +00:00
} else if #[cfg(target_arch = "wasm32")] {
2022-06-28 04:10:21 +00:00
Poll::Ready(t)
} else {
compile_error!("needs executor implementation")
2022-06-28 03:46:29 +00:00
}
}
2022-06-13 00:58:02 +00:00
}
Poll::Pending => Poll::Pending,
}
}
}