feat: add elasticsearch tracing subscriber

create lib/pk_core
This commit is contained in:
spiral 2022-07-10 01:15:03 -04:00
parent 5097eb9ce2
commit 1825a00248
No known key found for this signature in database
GPG Key ID: 244A11E4B0BCF40E
4 changed files with 128 additions and 0 deletions

14
lib/pk_core/Cargo.toml Normal file
View File

@ -0,0 +1,14 @@
[package]
name = "pk_core"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
chrono = "0.4.19"
reqwest = { version = "0.11.11", default-features = false, features = ["blocking", "rustls-tls"] }
tokio = { version = "1", features = ["full"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["json","std","env-filter"] }
tracing-appender = "0.2"

2
lib/pk_core/src/lib.rs Normal file
View File

@ -0,0 +1,2 @@
pub mod tracing;
pub mod util;

105
lib/pk_core/src/tracing.rs Normal file
View File

@ -0,0 +1,105 @@
use reqwest::blocking::{Client, ClientBuilder};
use tracing::*;
use tracing_subscriber::{fmt, layer::SubscriberExt, EnvFilter};
use tracing_appender::non_blocking::WorkerGuard;
use std::io::{Error, ErrorKind};
#[macro_export]
macro_rules! init {
($e:expr) => {
let _guard1 = pk_core::tracing::init_inner(None, None, None).await;
let _guard2 = span!(Level::ERROR, "service_name", service_name = $e).entered();
};
($e:expr, $f:expr) => {
let _guard1 = pk_core::tracing::init_inner($f, None, None).await;
let _guard2 = span!(Level::ERROR, "service_name", service_name = $e).entered();
};
($e:expr, $f:expr, $g:expr, $h:expr) => {
let _guard1 = pk_core::tracing::init_inner($f, $g, $h).await;
let _guard2 = span!(Level::ERROR, "service_name", service_name = $e).entered();
};
}
pub use init;
pub async fn init_inner(url: Option<String>, user: Option<&str>, password: Option<&str>) -> Option<WorkerGuard> {
let stdout_subscriber = tracing_subscriber::fmt().with_max_level(Level::INFO).finish();
if url.is_none() {
tracing::subscriber::set_global_default(stdout_subscriber)
.expect("Unable to set a global collector");
return None;
}
let client = tokio::task::spawn_blocking(|| {
let client = ClientBuilder::new()
.danger_accept_invalid_certs(true);
client.build()
}).await.unwrap().unwrap();
let elastic = ElasticLogWriter {
user: user.map(|v| v.to_string()),
password: password.map(|v| v.to_string()),
url: url.unwrap(),
client
};
let (non_blocking, _guard) = tracing_appender::non_blocking::NonBlockingBuilder::default()
.lossy(true)
.buffered_lines_limit(2000)
.finish(elastic);
let subscriber = stdout_subscriber
.with(EnvFilter::from_default_env().add_directive(tracing::Level::TRACE.into()))
.with(
fmt::Layer::new()
.json()
.flatten_event(true)
.with_writer(non_blocking)
);
tracing::subscriber::set_global_default(subscriber)
.expect("Unable to set a global collector");
return Some(_guard);
}
pub struct ElasticLogWriter {
user: Option<String>,
password: Option<String>,
url: String,
client: Client,
}
impl std::io::Write for ElasticLogWriter {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let data = String::from_utf8(buf.to_vec());
if data.is_err() {
return Err(Error::new(ErrorKind::Other, data.err().unwrap()));
}
let orig_json = data.unwrap();
let cur_date = chrono::Utc::now().format("%Y-%m-%d");
let mut builder = self.client.post(format!("{}/pluralkit-logs-{}/_doc", self.url.clone(), cur_date))
.header("content-type", "application/json")
.body(orig_json);
if self.user.is_some() {
builder = builder.basic_auth(self.user.as_ref().unwrap(), self.password.as_ref());
}
let res = builder.send();
match res {
Ok(_) => Ok(buf.len()),
Err(err) => Err(Error::new(ErrorKind::Other, err)),
}
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}

7
lib/pk_core/src/util.rs Normal file
View File

@ -0,0 +1,7 @@
/// get_env gets an env variable as Option<String instead of Result<String>.
pub fn get_env(key: &str) -> Option<String> {
match std::env::var(key) {
Ok(val) => { Some(val) }
Err(_) => None
}
}