veilid/veilid-server/src/log_safe_channel.rs
2022-03-10 19:00:59 -05:00

105 lines
3.2 KiB
Rust

use parking_lot::Mutex;
use std::sync::Arc;
// Must use async_std channel to send to main thread from blocking thread
use async_std::channel::bounded as async_bounded;
use async_std::channel::Receiver as AsyncReceiver;
pub use async_std::channel::RecvError;
// Must use std mpsc so no logs are generated by async code
use std::sync::mpsc::sync_channel as std_sync_channel;
use std::sync::mpsc::SyncSender as StdSender;
use std::sync::mpsc::TrySendError as StdTrySendError;
//////////////////////////////////////////
#[derive(Clone)]
pub struct LogSafeChannelCloser {
sender: Arc<Mutex<Option<StdSender<String>>>>,
}
impl LogSafeChannelCloser {
pub fn close(&self) {
// Drop the sender
self.sender.lock().take();
}
}
//////////////////////////////////////////
pub struct LogSafeChannelWriterShim {
sender: Arc<Mutex<Option<StdSender<String>>>>,
}
impl std::io::Write for LogSafeChannelWriterShim {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let bufstr = String::from_utf8_lossy(buf).to_string();
let sender = self.sender.lock();
if let Some(sender) = &*sender {
if let Err(e) = sender.try_send(bufstr) {
match e {
StdTrySendError::Full(_) => {
Err(std::io::Error::from(std::io::ErrorKind::WouldBlock))
}
StdTrySendError::Disconnected(_) => {
Err(std::io::Error::from(std::io::ErrorKind::BrokenPipe))
}
}
} else {
Ok(buf.len())
}
} else {
Err(std::io::Error::from(std::io::ErrorKind::BrokenPipe))
}
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
pub type LogSafeChannelWriter = std::io::LineWriter<LogSafeChannelWriterShim>;
//////////////////////////////////////////
#[derive(Clone)]
pub struct LogSafeChannel {
async_receiver: AsyncReceiver<String>,
}
impl LogSafeChannel {
pub fn new() -> (Self, LogSafeChannelWriter, LogSafeChannelCloser) {
let (async_sender, async_receiver) = async_bounded(1024);
let (std_sender, std_receiver) = std_sync_channel(1024);
let shared_std_sender = Arc::new(Mutex::new(Some(std_sender)));
// Spawn a processing thread for the blocking std sender
async_std::task::spawn(async move {
#[allow(clippy::while_let_loop)]
loop {
let message = match std_receiver.recv() {
Ok(v) => v,
Err(_) => break,
};
if async_sender.send(message).await.is_err() {
break;
}
}
});
(
Self { async_receiver },
LogSafeChannelWriter::with_capacity(
65536,
LogSafeChannelWriterShim {
sender: shared_std_sender.clone(),
},
),
LogSafeChannelCloser {
sender: shared_std_sender,
},
)
}
pub async fn recv(&mut self) -> Result<String, RecvError> {
self.async_receiver.recv().await
}
}