chore: remove in-progress unused stuff from repo
This commit is contained in:
parent
13a44364ad
commit
7ffe83ab5e
@ -24,10 +24,6 @@ LABEL org.opencontainers.image.source = "https://github.com/PluralKit/PluralKit"
|
||||
WORKDIR /app
|
||||
COPY --from=build /app ./
|
||||
|
||||
# Runtime dependency in prod
|
||||
RUN apt update && apt install -y curl
|
||||
ADD scripts/run-clustered.sh /
|
||||
|
||||
# Allow overriding CMD from eg. docker-compose to run API layer too
|
||||
ENTRYPOINT ["dotnet"]
|
||||
CMD ["bin/PluralKit.Bot.dll"]
|
||||
|
3
gateway/.gitignore
vendored
3
gateway/.gitignore
vendored
@ -1,3 +0,0 @@
|
||||
/target
|
||||
|
||||
config.json
|
2031
gateway/Cargo.lock
generated
2031
gateway/Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@ -1,35 +0,0 @@
|
||||
[package]
|
||||
name = "gateway"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
# Infrastructure
|
||||
anyhow = "1"
|
||||
config = { version = "0.11", default-features = false, features = ["json"] }
|
||||
futures = "0.3"
|
||||
serde = { version = "1", features = ["derive"] }
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = "0.3"
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
tokio-stream = { version = "0.1", features = ["sync"] }
|
||||
lazy_static = "1.4.0"
|
||||
|
||||
procfs = "0.12.0"
|
||||
libc = "0.2.122"
|
||||
|
||||
# Twilight
|
||||
twilight-gateway = "0.10.1"
|
||||
twilight-gateway-queue = "0.10.0"
|
||||
twilight-http = "0.10.0"
|
||||
twilight-model = "0.10.2"
|
||||
|
||||
# Database
|
||||
deadpool = "0.9"
|
||||
deadpool-postgres = "0.10"
|
||||
postgres-types = { version = "0.2", features = ["derive"] }
|
||||
tokio-postgres = { version = "0.7", features = ["with-serde_json-1", "with-uuid-0_8"] }
|
||||
|
||||
redis = { version = "0.21.5", features = ["aio", "tokio-comp"] }
|
||||
|
||||
myriad = { path = "../myriad_rs" }
|
@ -1,22 +0,0 @@
|
||||
# twilight requires newer rustc than what is in alpine:latest
|
||||
FROM alpine:edge AS builder
|
||||
|
||||
RUN apk add cargo protobuf
|
||||
|
||||
# Precache crates.io index
|
||||
RUN cargo search >/dev/null
|
||||
|
||||
WORKDIR /build
|
||||
COPY proto/ /build/proto
|
||||
COPY gateway/ /build/gateway
|
||||
COPY myriad_rs/ /build/myriad_rs
|
||||
|
||||
# todo: cache build of myriad_rs elsewhere
|
||||
|
||||
RUN (cd gateway && cargo build --release)
|
||||
|
||||
FROM alpine:latest
|
||||
|
||||
COPY --from=builder /build/gateway/target/release/pluralkit /opt/gateway
|
||||
|
||||
ENTRYPOINT ["/opt/gateway"]
|
@ -1,22 +0,0 @@
|
||||
use config::{self, Config};
|
||||
use serde::Deserialize;
|
||||
|
||||
#[derive(Deserialize, Debug)]
|
||||
pub struct BotConfig {
|
||||
pub token: String,
|
||||
pub max_concurrency: u64,
|
||||
pub database: String,
|
||||
pub redis_addr: String,
|
||||
pub redis_gateway_queue_addr: String,
|
||||
pub shard_count: u64
|
||||
}
|
||||
|
||||
pub fn load_config() -> BotConfig {
|
||||
let mut settings = Config::default();
|
||||
settings.merge(config::File::with_name("config")).unwrap();
|
||||
settings
|
||||
.merge(config::Environment::with_prefix("PluralKit"))
|
||||
.unwrap();
|
||||
|
||||
settings.try_into::<BotConfig>().unwrap()
|
||||
}
|
@ -1,144 +0,0 @@
|
||||
use std::{str::FromStr, time::SystemTime};
|
||||
|
||||
use crate::config;
|
||||
use anyhow::Context;
|
||||
use deadpool_postgres::{Manager, ManagerConfig, Pool, RecyclingMethod};
|
||||
use tokio_postgres::{self, types::FromSql, Row};
|
||||
use twilight_model::id::Id;
|
||||
use twilight_model::id::marker::ChannelMarker;
|
||||
|
||||
pub async fn init_db(cfg: &config::BotConfig) -> anyhow::Result<Pool> {
|
||||
let pg_config = tokio_postgres::config::Config::from_str(&cfg.database)
|
||||
.context("could not parse connection string")?;
|
||||
|
||||
let mgr_config = ManagerConfig {
|
||||
recycling_method: RecyclingMethod::Fast,
|
||||
};
|
||||
let mgr = Manager::from_config(pg_config, tokio_postgres::NoTls, mgr_config);
|
||||
let pool = Pool::builder(mgr)
|
||||
.max_size(16)
|
||||
.build()
|
||||
.context("could not initialize pool")?;
|
||||
Ok(pool)
|
||||
}
|
||||
|
||||
pub async fn get_message_context(
|
||||
pool: &Pool,
|
||||
account_id: u64,
|
||||
guild_id: u64,
|
||||
channel_id: u64,
|
||||
) -> anyhow::Result<Option<MessageContext>> {
|
||||
let client = pool.get().await?;
|
||||
let stmt = client
|
||||
.prepare_cached("select * from message_context($1, $2, $3)")
|
||||
.await?;
|
||||
let result = client
|
||||
.query_opt(
|
||||
&stmt,
|
||||
&[
|
||||
&(account_id as i64),
|
||||
&(guild_id as i64),
|
||||
&(channel_id as i64),
|
||||
],
|
||||
)
|
||||
.await
|
||||
.context("could not fetch message context")?;
|
||||
|
||||
Ok(result.map(parse_message_context))
|
||||
}
|
||||
|
||||
pub async fn get_proxy_members(
|
||||
pool: &Pool,
|
||||
account_id: u64,
|
||||
guild_id: u64,
|
||||
) -> anyhow::Result<Vec<ProxyMember>> {
|
||||
let client = pool.get().await?;
|
||||
let stmt = client
|
||||
.prepare_cached("select * from proxy_members($1, $2)")
|
||||
.await?;
|
||||
let result = client
|
||||
.query(&stmt, &[&(account_id as i64), &(guild_id as i64)])
|
||||
.await
|
||||
.context("could not fetch proxy members")?;
|
||||
|
||||
Ok(result.into_iter().map(parse_proxy_member).collect())
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct MessageContext {
|
||||
pub system_id: Option<i32>,
|
||||
pub log_channel: Option<Id<ChannelMarker>>,
|
||||
pub in_blacklist: bool,
|
||||
pub in_log_blacklist: bool,
|
||||
pub log_cleanup_enabled: bool,
|
||||
pub proxy_enabled: bool,
|
||||
pub last_switch: Option<i32>,
|
||||
pub last_switch_members: Option<Vec<i32>>,
|
||||
pub last_switch_timestamp: Option<SystemTime>,
|
||||
pub system_tag: Option<String>,
|
||||
pub system_guild_tag: Option<String>,
|
||||
pub tag_enabled: bool,
|
||||
pub system_avatar: Option<String>,
|
||||
pub allow_autoproxy: bool,
|
||||
pub latch_timeout: Option<i32>,
|
||||
}
|
||||
|
||||
#[derive(Debug, FromSql)]
|
||||
#[postgres(name = "proxy_tag")]
|
||||
pub struct ProxyTag {
|
||||
pub prefix: Option<String>,
|
||||
pub suffix: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ProxyMember {
|
||||
pub id: i32,
|
||||
pub proxy_tags: Vec<ProxyTag>,
|
||||
pub keep_proxy: bool,
|
||||
pub server_name: Option<String>,
|
||||
pub display_name: Option<String>,
|
||||
pub name: String,
|
||||
pub server_avatar: Option<String>,
|
||||
pub avatar: Option<String>,
|
||||
pub allow_autoproxy: bool,
|
||||
pub color: Option<String>,
|
||||
}
|
||||
|
||||
fn parse_message_context(row: Row) -> MessageContext {
|
||||
MessageContext {
|
||||
system_id: row.get("system_id"),
|
||||
log_channel: to_channel_id_opt(row.get("log_channel")),
|
||||
in_blacklist: row.get::<_, Option<_>>("in_blacklist").unwrap_or(false),
|
||||
in_log_blacklist: row.get::<_, Option<_>>("in_log_blacklist").unwrap_or(false),
|
||||
log_cleanup_enabled: row.get("log_cleanup_enabled"),
|
||||
proxy_enabled: row.get("proxy_enabled"),
|
||||
last_switch: row.get("last_switch"),
|
||||
last_switch_members: row.get("last_switch_members"),
|
||||
last_switch_timestamp: row.get("last_switch_timestamp"),
|
||||
system_tag: row.get("system_tag"),
|
||||
system_guild_tag: row.get("system_guild_tag"),
|
||||
tag_enabled: row.get("tag_enabled"),
|
||||
system_avatar: row.get("system_avatar"),
|
||||
allow_autoproxy: row.get("allow_autoproxy"),
|
||||
latch_timeout: row.get("latch_timeout"),
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_proxy_member(row: Row) -> ProxyMember {
|
||||
ProxyMember {
|
||||
id: row.get("id"),
|
||||
proxy_tags: row.get("proxy_tags"),
|
||||
keep_proxy: row.get("keep_proxy"),
|
||||
server_name: row.get("server_name"),
|
||||
display_name: row.get("display_name"),
|
||||
name: row.get("name"),
|
||||
server_avatar: row.get("server_avatar"),
|
||||
avatar: row.get("avatar"),
|
||||
allow_autoproxy: row.get("allow_autoproxy"),
|
||||
color: row.get("color"),
|
||||
}
|
||||
}
|
||||
|
||||
fn to_channel_id_opt(id: Option<i64>) -> Option<Id<ChannelMarker>> {
|
||||
id.and_then(|x| Some(Id::<ChannelMarker>::new(x as u64)))
|
||||
}
|
@ -1,110 +0,0 @@
|
||||
use deadpool_postgres::Pool;
|
||||
use redis::AsyncCommands;
|
||||
use twilight_model::gateway::event::GatewayEventDeserializer;
|
||||
use std::sync::Arc;
|
||||
use tracing::info;
|
||||
|
||||
use twilight_gateway::Event;
|
||||
use twilight_http::Client as HttpClient;
|
||||
|
||||
lazy_static::lazy_static! {
|
||||
static ref ALLOWED_EVENTS: Vec<&'static str> = [
|
||||
"INTERACTION_CREATE",
|
||||
"MESSAGE_CREATE",
|
||||
"MESSAGE_DELETE",
|
||||
"MESSAGE_DELETE_BULK",
|
||||
"MESSAGE_UPDATE",
|
||||
"MESSAGE_REACTION_ADD",
|
||||
].to_vec();
|
||||
}
|
||||
|
||||
pub async fn handle_event<'a>(
|
||||
shard_id: u64,
|
||||
event: Event,
|
||||
http: Arc<HttpClient>,
|
||||
_db: Pool,
|
||||
rconn: redis::Client
|
||||
) -> anyhow::Result<()> {
|
||||
myriad::cache::handle_event(event.clone(), rconn.clone()).await?;
|
||||
|
||||
match event {
|
||||
Event::GatewayInvalidateSession(resumable) => {
|
||||
info!("shard {} session invalidated, resumable? {}", shard_id, resumable);
|
||||
}
|
||||
Event::ShardConnected(_) => {
|
||||
info!("shard {} connected", shard_id);
|
||||
}
|
||||
Event::ShardDisconnected(info) => {
|
||||
info!("shard {} disconnected, code: {:?}, reason: {:?}", shard_id, info.code, info.reason);
|
||||
}
|
||||
Event::ShardPayload(payload) => {
|
||||
let deserializer = GatewayEventDeserializer::from_json(std::str::from_utf8(&payload.bytes)?).unwrap();
|
||||
if deserializer.op() == 0 && ALLOWED_EVENTS.contains(&deserializer.event_type_ref().unwrap()) {
|
||||
let mut conn = rconn.get_async_connection().await?;
|
||||
conn.publish::<&str, Vec<u8>, i32>(&format!("evt-{shard_id}"), payload.bytes).await?;
|
||||
}
|
||||
}
|
||||
Event::MessageCreate(msg) => {
|
||||
if msg.content == "pkt;test" {
|
||||
// let message_context = db::get_message_context(
|
||||
// &db,
|
||||
// msg.author.id.get(),
|
||||
// msg.guild_id.map(|x| x.get()).unwrap_or(0),
|
||||
// msg.channel_id.get(),
|
||||
// )
|
||||
// .await?;
|
||||
|
||||
// let content = format!("message context:\n```\n{:#?}\n```", message_context);
|
||||
// http.create_message(msg.channel_id)
|
||||
// .reply(msg.id)
|
||||
// .content(&content)?
|
||||
// .exec()
|
||||
// .await?;
|
||||
|
||||
// let proxy_members = db::get_proxy_members(
|
||||
// &db,
|
||||
// msg.author.id.get(),
|
||||
// msg.guild_id.map(|x| x.get()).unwrap_or(0),
|
||||
// )
|
||||
// .await?;
|
||||
|
||||
// let content = format!("proxy members:\n```\n{:#?}\n```", proxy_members);
|
||||
// info!("{}", content);
|
||||
// http.create_message(msg.channel_id)
|
||||
// .reply(msg.id)
|
||||
// .content(&content)?
|
||||
// .exec()
|
||||
// .await?;
|
||||
|
||||
// let cache_stats = cache.stats();
|
||||
|
||||
// let pid = unsafe { libc::getpid() };
|
||||
// let pagesize = {
|
||||
// unsafe {
|
||||
// libc::sysconf(libc::_SC_PAGESIZE)
|
||||
// }
|
||||
// };
|
||||
|
||||
// let p = procfs::process::Process::new(pid)?;
|
||||
// let content = format!(
|
||||
// "[rust]\nguilds:{}\nchannels:{}\nroles:{}\nusers:{}\nmembers:{}\n\nmemory usage: {}",
|
||||
// cache_stats.guilds(),
|
||||
// cache_stats.channels(),
|
||||
// cache_stats.roles(),
|
||||
// cache_stats.users(),
|
||||
// cache_stats.members(),
|
||||
// p.stat.rss * pagesize
|
||||
// );
|
||||
|
||||
// http.create_message(msg.channel_id)
|
||||
// .reply(msg.id)
|
||||
// .content(&content)?
|
||||
// .exec()
|
||||
// .await?;
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
@ -1,119 +0,0 @@
|
||||
use deadpool_postgres::Pool;
|
||||
use futures::StreamExt;
|
||||
use std::{sync::Arc, env};
|
||||
use tracing::{error, info, Level};
|
||||
|
||||
use twilight_gateway::{
|
||||
cluster::{Events, ShardScheme},
|
||||
Cluster, EventTypeFlags, Intents,
|
||||
};
|
||||
use twilight_http::Client as HttpClient;
|
||||
|
||||
mod config;
|
||||
mod evt;
|
||||
mod db;
|
||||
mod util;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
init_tracing();
|
||||
info!("starting...");
|
||||
|
||||
let cfg = config::load_config();
|
||||
|
||||
let http = Arc::new(HttpClient::new(cfg.token.clone()));
|
||||
let rconn = redis::Client::open(cfg.redis_addr.clone()).unwrap();
|
||||
let (_cluster, events) = init_gateway(&cfg, rconn.clone()).await?;
|
||||
// let cache = init_cache();
|
||||
let db = db::init_db(&cfg).await?;
|
||||
|
||||
run(http, events, db, rconn).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn run(
|
||||
http: Arc<HttpClient>,
|
||||
mut events: Events,
|
||||
db: Pool,
|
||||
rconn: redis::Client,
|
||||
) -> anyhow::Result<()> {
|
||||
while let Some((shard_id, event)) = events.next().await {
|
||||
|
||||
// cache.update(&event);
|
||||
|
||||
let http_cloned = http.clone();
|
||||
let db_cloned = db.clone();
|
||||
let rconn_cloned = rconn.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
let result = evt::handle_event(
|
||||
shard_id,
|
||||
event,
|
||||
http_cloned,
|
||||
db_cloned,
|
||||
rconn_cloned
|
||||
)
|
||||
.await;
|
||||
if let Err(e) = result {
|
||||
error!("error in event handler: {:?}", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn init_tracing() {
|
||||
tracing_subscriber::fmt()
|
||||
.with_max_level(Level::INFO)
|
||||
.init();
|
||||
}
|
||||
|
||||
async fn init_gateway(
|
||||
cfg: &config::BotConfig,
|
||||
rconn: redis::Client,
|
||||
) -> anyhow::Result<(Arc<Cluster>, Events)> {
|
||||
let shard_count = cfg.shard_count.clone();
|
||||
|
||||
let scheme: ShardScheme;
|
||||
|
||||
if shard_count < 16 {
|
||||
scheme = ShardScheme::Auto;
|
||||
} else {
|
||||
let cluster_id = env::var("NOMAD_ALLOC_INDEX").or::<String>(Result::Ok("0".to_string())).unwrap().parse::<u64>().unwrap();
|
||||
let first_shard_id = 16 * cluster_id;
|
||||
|
||||
scheme = ShardScheme::try_from((first_shard_id..first_shard_id+16, shard_count)).unwrap();
|
||||
}
|
||||
|
||||
let queue = util::RedisQueue {
|
||||
client: rconn.clone(),
|
||||
concurrency: cfg.max_concurrency.clone()
|
||||
};
|
||||
|
||||
let (cluster, events) = Cluster::builder(
|
||||
cfg.token.clone(),
|
||||
Intents::GUILDS
|
||||
| Intents::DIRECT_MESSAGES
|
||||
| Intents::DIRECT_MESSAGE_REACTIONS
|
||||
| Intents::GUILD_EMOJIS_AND_STICKERS
|
||||
| Intents::GUILD_MESSAGES
|
||||
| Intents::GUILD_MESSAGE_REACTIONS
|
||||
| Intents::GUILD_WEBHOOKS
|
||||
| Intents::MESSAGE_CONTENT
|
||||
)
|
||||
.shard_scheme(scheme)
|
||||
.event_types(EventTypeFlags::all())
|
||||
.queue(Arc::new(queue))
|
||||
.build()
|
||||
.await?;
|
||||
let cluster = Arc::new(cluster);
|
||||
|
||||
let cluster_spawn = Arc::clone(&cluster);
|
||||
tokio::spawn(async move {
|
||||
cluster_spawn.up().await;
|
||||
});
|
||||
|
||||
Ok((cluster, events))
|
||||
}
|
@ -1,32 +0,0 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use twilight_gateway_queue::Queue;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct RedisQueue {
|
||||
pub client: redis::Client,
|
||||
pub concurrency: u64
|
||||
}
|
||||
|
||||
impl Queue for RedisQueue {
|
||||
fn request<'a>(&'a self, shard_id: [u64; 2]) -> std::pin::Pin<Box<dyn futures::Future<Output = ()> + Send + 'a>> {
|
||||
Box::pin(request_inner(self.client.clone(), self.concurrency, *shard_id.first().unwrap()))
|
||||
}
|
||||
}
|
||||
|
||||
async fn request_inner(client: redis::Client, concurrency: u64, shard_id: u64) {
|
||||
let mut conn = client.get_async_connection().await.unwrap();
|
||||
let key = format!("pluralkit:identify:{}", (shard_id % concurrency));
|
||||
|
||||
let mut cmd = redis::cmd("SET");
|
||||
cmd.arg(key).arg("1").arg("EX").arg(6i8).arg("NX");
|
||||
|
||||
loop {
|
||||
let done = cmd.clone().query_async::<redis::aio::Connection, Option<String>>(&mut conn).await;
|
||||
if done.unwrap().is_some() {
|
||||
return
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(500)).await;
|
||||
}
|
||||
}
|
||||
|
@ -1,14 +0,0 @@
|
||||
[package]
|
||||
name = "pk_core"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
chrono = "0.4.19"
|
||||
reqwest = { version = "0.11.11", default-features = false, features = ["blocking", "rustls-tls"] }
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = { version = "0.3", features = ["json","std","env-filter"] }
|
||||
tracing-appender = "0.2"
|
@ -1,2 +0,0 @@
|
||||
pub mod tracing;
|
||||
pub mod util;
|
@ -1,105 +0,0 @@
|
||||
use reqwest::blocking::{Client, ClientBuilder};
|
||||
|
||||
use tracing::*;
|
||||
use tracing_subscriber::{fmt, layer::SubscriberExt, EnvFilter};
|
||||
use tracing_appender::non_blocking::WorkerGuard;
|
||||
|
||||
use std::io::{Error, ErrorKind};
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! init {
|
||||
($e:expr) => {
|
||||
let _guard1 = pk_core::tracing::init_inner(None, None, None).await;
|
||||
let _guard2 = span!(Level::ERROR, "service_name", service_name = $e).entered();
|
||||
};
|
||||
($e:expr, $f:expr) => {
|
||||
let _guard1 = pk_core::tracing::init_inner($f, None, None).await;
|
||||
let _guard2 = span!(Level::ERROR, "service_name", service_name = $e).entered();
|
||||
};
|
||||
($e:expr, $f:expr, $g:expr, $h:expr) => {
|
||||
let _guard1 = pk_core::tracing::init_inner($f, $g, $h).await;
|
||||
let _guard2 = span!(Level::ERROR, "service_name", service_name = $e).entered();
|
||||
};
|
||||
}
|
||||
|
||||
pub use init;
|
||||
|
||||
pub async fn init_inner(url: Option<String>, user: Option<&str>, password: Option<&str>) -> Option<WorkerGuard> {
|
||||
let stdout_subscriber = tracing_subscriber::fmt().with_max_level(Level::INFO).finish();
|
||||
|
||||
if url.is_none() {
|
||||
tracing::subscriber::set_global_default(stdout_subscriber)
|
||||
.expect("Unable to set a global collector");
|
||||
return None;
|
||||
}
|
||||
|
||||
let client = tokio::task::spawn_blocking(|| {
|
||||
let client = ClientBuilder::new()
|
||||
.danger_accept_invalid_certs(true);
|
||||
client.build()
|
||||
}).await.unwrap().unwrap();
|
||||
|
||||
let elastic = ElasticLogWriter {
|
||||
user: user.map(|v| v.to_string()),
|
||||
password: password.map(|v| v.to_string()),
|
||||
url: url.unwrap(),
|
||||
client
|
||||
};
|
||||
|
||||
let (non_blocking, _guard) = tracing_appender::non_blocking::NonBlockingBuilder::default()
|
||||
.lossy(true)
|
||||
.buffered_lines_limit(2000)
|
||||
.finish(elastic);
|
||||
|
||||
let subscriber = stdout_subscriber
|
||||
.with(EnvFilter::from_default_env().add_directive(tracing::Level::TRACE.into()))
|
||||
.with(
|
||||
fmt::Layer::new()
|
||||
.json()
|
||||
.flatten_event(true)
|
||||
.with_writer(non_blocking)
|
||||
);
|
||||
|
||||
tracing::subscriber::set_global_default(subscriber)
|
||||
.expect("Unable to set a global collector");
|
||||
|
||||
return Some(_guard);
|
||||
}
|
||||
|
||||
pub struct ElasticLogWriter {
|
||||
user: Option<String>,
|
||||
password: Option<String>,
|
||||
url: String,
|
||||
client: Client,
|
||||
}
|
||||
|
||||
impl std::io::Write for ElasticLogWriter {
|
||||
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
|
||||
let data = String::from_utf8(buf.to_vec());
|
||||
if data.is_err() {
|
||||
return Err(Error::new(ErrorKind::Other, data.err().unwrap()));
|
||||
}
|
||||
|
||||
let orig_json = data.unwrap();
|
||||
|
||||
let cur_date = chrono::Utc::now().format("%Y-%m-%d");
|
||||
let mut builder = self.client.post(format!("{}/pluralkit-logs-{}/_doc", self.url.clone(), cur_date))
|
||||
.header("content-type", "application/json")
|
||||
.body(orig_json);
|
||||
|
||||
if self.user.is_some() {
|
||||
builder = builder.basic_auth(self.user.as_ref().unwrap(), self.password.as_ref());
|
||||
}
|
||||
|
||||
let res = builder.send();
|
||||
|
||||
match res {
|
||||
Ok(_) => Ok(buf.len()),
|
||||
Err(err) => Err(Error::new(ErrorKind::Other, err)),
|
||||
}
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> std::io::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
@ -1,7 +0,0 @@
|
||||
/// get_env gets an env variable as Option<String instead of Result<String>.
|
||||
pub fn get_env(key: &str) -> Option<String> {
|
||||
match std::env::var(key) {
|
||||
Ok(val) => { Some(val) }
|
||||
Err(_) => None
|
||||
}
|
||||
}
|
1489
myriad_rs/Cargo.lock
generated
1489
myriad_rs/Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@ -1,16 +0,0 @@
|
||||
[package]
|
||||
name = "myriad"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1"
|
||||
|
||||
prost = "0.10.3"
|
||||
twilight-gateway = "0.10.1"
|
||||
redis = { version = "0.21.5", features = ["aio", "tokio-comp"] }
|
||||
|
||||
[build-dependencies]
|
||||
prost-build = "0.10.3"
|
@ -1,6 +0,0 @@
|
||||
use std::io::Result;
|
||||
|
||||
fn main() -> Result<()> {
|
||||
prost_build::compile_protos(&["../proto/discord_cache.proto"], &["../proto/"])?;
|
||||
Ok(())
|
||||
}
|
@ -1,229 +0,0 @@
|
||||
use twilight_gateway::Event;
|
||||
use redis::AsyncCommands;
|
||||
use prost::Message;
|
||||
|
||||
include!(concat!(env!("OUT_DIR"), "/myriad.cache.rs"));
|
||||
|
||||
pub async fn handle_event<'a>(
|
||||
event: Event,
|
||||
rconn: redis::Client
|
||||
) -> anyhow::Result<()> {
|
||||
let mut conn = rconn.get_async_connection().await.unwrap();
|
||||
|
||||
match event {
|
||||
// todo: save private channels to sql (see SaveDMChannelStub / PrivateChannelService)
|
||||
// todo: save user profiles for some reason (from message create, etc)
|
||||
// todo(dotnet): remove relying on cache.OwnUserId
|
||||
// todo(dotnet): correctly calculate permissions in guild threads
|
||||
|
||||
Event::GuildCreate(guild) => {
|
||||
// todo: clear any existing guild state
|
||||
// save guild itself
|
||||
conn.hset("discord:guilds", guild.id.get(), CachedGuild{
|
||||
id: guild.id.get(),
|
||||
name: guild.name.to_string(),
|
||||
owner_id: guild.owner_id.get(),
|
||||
premium_tier: guild.premium_tier as i32,
|
||||
}.encode_to_vec()).await?;
|
||||
// save all roles in guild
|
||||
for role in guild.roles.clone().into_iter() {
|
||||
conn.hset::<&str, u64, Vec<u8>, i32>("discord:roles", role.id.get(), CachedRole{
|
||||
id: role.id.get(),
|
||||
name: role.name,
|
||||
position: role.position as i32,
|
||||
permissions: role.permissions.bits(),
|
||||
mentionable: role.mentionable,
|
||||
}.encode_to_vec()).await?;
|
||||
// save guild-role map
|
||||
conn.hset::<String, u64, u64, i32>(format!("discord:guild_roles:{}", guild.id.get()), role.id.get(), 1).await?;
|
||||
}
|
||||
// save all channels in guild
|
||||
for channel in guild.channels.clone().into_iter() {
|
||||
conn.hset::<&str, u64, Vec<u8>, i32>("discord:channels", channel.id.get(), CachedChannel{
|
||||
id: channel.id.get(),
|
||||
r#type: channel.kind as i32,
|
||||
position: channel.position.unwrap_or(0) as i32,
|
||||
name: channel.name,
|
||||
permission_overwrites: channel.permission_overwrites.unwrap().into_iter().map(|v| Overwrite{
|
||||
id: v.id.get(),
|
||||
r#type: v.kind as i32,
|
||||
allow: v.allow.bits(),
|
||||
deny: v.deny.bits(),
|
||||
}).collect(),
|
||||
guild_id: Some(guild.id.get()),
|
||||
parent_id: channel.parent_id.map(|v| v.get()),
|
||||
}.encode_to_vec()).await?;
|
||||
// save guild-channel map
|
||||
conn.hset::<String, u64, u64, i32>(format!("discord:guild_channels:{}", guild.id.get()), channel.id.get(), 1).await?;
|
||||
}
|
||||
|
||||
// save all threads in guild (as channels lol)
|
||||
for thread in guild.threads.clone().into_iter() {
|
||||
conn.hset::<&str, u64, Vec<u8>, i32>("discord:channels", thread.id.get(), CachedChannel{
|
||||
id: thread.id.get(),
|
||||
r#type: thread.kind as i32,
|
||||
position: thread.position.unwrap_or(0) as i32,
|
||||
name: thread.name,
|
||||
guild_id: Some(guild.id.get()),
|
||||
parent_id: thread.parent_id.map(|v| v.get()),
|
||||
..Default::default()
|
||||
}.encode_to_vec()).await?;
|
||||
// save guild-channel map
|
||||
conn.hset::<String, u64, u64, i32>(format!("discord:guild_channels:{}", guild.id.get()), thread.id.get(), 1).await?;
|
||||
}
|
||||
|
||||
// save self guild member
|
||||
conn.hset("discord:guild_members", guild.id.get(), CachedGuildMember{
|
||||
roles: guild.members.get(0).unwrap().roles.clone().into_iter().map(|r| r.get()).collect()
|
||||
}.encode_to_vec()).await?;
|
||||
|
||||
// c# code also saves users in guildCreate.Members, but I'm pretty sure that doesn't have anything now because intents
|
||||
}
|
||||
Event::GuildUpdate(guild) => {
|
||||
// save guild itself
|
||||
conn.hset("discord:guilds", guild.id.get(), CachedGuild{
|
||||
id: guild.id.get(),
|
||||
name: guild.name.to_string(),
|
||||
owner_id: guild.owner_id.get(),
|
||||
premium_tier: guild.premium_tier as i32,
|
||||
}.encode_to_vec()).await?;
|
||||
}
|
||||
Event::GuildDelete(guild) => {
|
||||
// delete guild
|
||||
conn.hdel("discord:guilds", guild.id.get()).await?;
|
||||
if let Ok(roles) = conn.hkeys::<String, Vec<u64>>(format!("discord:guild_roles:{}", guild.id.get())).await {
|
||||
for role in roles.into_iter() {
|
||||
conn.hdel("discord:roles", role).await?;
|
||||
}
|
||||
}
|
||||
conn.del(format!("discord:guild_roles:{}", guild.id.get())).await?;
|
||||
|
||||
// this probably should also delete all channels/roles/etc of the guild
|
||||
if let Ok(channel) = conn.hkeys::<String, Vec<u64>>(format!("discord:guild_channels:{}", guild.id.get())).await {
|
||||
conn.hdel("discord:channels", channel).await?;
|
||||
}
|
||||
conn.del(format!("discord:guild_channels:{}", guild.id.get())).await?;
|
||||
}
|
||||
Event::MemberUpdate(member) => {
|
||||
// save self guild member
|
||||
conn.hset("discord:guild_members", member.guild_id.get(), CachedGuildMember{
|
||||
roles: member.roles.clone().into_iter().map(|r| r.get()).collect()
|
||||
}.encode_to_vec()).await?;
|
||||
}
|
||||
Event::ChannelCreate(channel) => {
|
||||
// save channel
|
||||
conn.hset::<&str, u64, Vec<u8>, i32>("discord:channels", channel.id.get(), CachedChannel{
|
||||
id: channel.id.get(),
|
||||
r#type: channel.kind as i32,
|
||||
position: channel.position.unwrap_or(0) as i32,
|
||||
name: channel.name.as_deref().map(|v| v.to_string()),
|
||||
permission_overwrites: channel.permission_overwrites.as_ref().unwrap().into_iter().map(|v| Overwrite{
|
||||
id: v.id.get(),
|
||||
r#type: v.kind as i32,
|
||||
allow: v.allow.bits(),
|
||||
deny: v.deny.bits(),
|
||||
}).collect(),
|
||||
guild_id: channel.guild_id.map(|v| v.get()),
|
||||
parent_id: channel.parent_id.map(|v| v.get()),
|
||||
}.encode_to_vec()).await?;
|
||||
|
||||
// update guild-channel map (if this is a guild channel)
|
||||
if let Some(guild_id) = channel.guild_id {
|
||||
conn.hset::<String, u64, u64, i32>(format!("discord:guild_channels:{}", guild_id.get()), channel.id.get(), 1).await?;
|
||||
}
|
||||
}
|
||||
Event::ChannelUpdate(channel) => {
|
||||
conn.hset::<&str, u64, Vec<u8>, i32>("discord:channels", channel.id.get(), CachedChannel{
|
||||
id: channel.id.get(),
|
||||
r#type: channel.kind as i32,
|
||||
position: channel.position.unwrap_or(0) as i32,
|
||||
name: channel.name.as_deref().map(|v| v.to_string()),
|
||||
permission_overwrites: channel.permission_overwrites.as_ref().unwrap().into_iter().map(|v| Overwrite{
|
||||
id: v.id.get(),
|
||||
r#type: v.kind as i32,
|
||||
allow: v.allow.bits(),
|
||||
deny: v.deny.bits(),
|
||||
}).collect(),
|
||||
guild_id: channel.guild_id.map(|v| v.get()),
|
||||
parent_id: channel.parent_id.map(|v| v.get()),
|
||||
}.encode_to_vec()).await?;
|
||||
}
|
||||
Event::ChannelDelete(channel) => {
|
||||
// delete channel
|
||||
conn.hdel("discord:channels", channel.id.get()).await?;
|
||||
// update guild-channel map
|
||||
if let Some(guild_id) = channel.guild_id {
|
||||
conn.hdel(format!("discord:guild_channels:{}", guild_id.get()), channel.id.get()).await?;
|
||||
}
|
||||
}
|
||||
Event::RoleCreate(role) => {
|
||||
// save role
|
||||
conn.hset::<&str, u64, Vec<u8>, i32>("discord:roles", role.role.id.get(), CachedRole{
|
||||
id: role.role.id.get(),
|
||||
name: role.role.name,
|
||||
position: role.role.position as i32,
|
||||
permissions: role.role.permissions.bits(),
|
||||
mentionable: role.role.mentionable,
|
||||
}.encode_to_vec()).await?;
|
||||
// update guild-role map
|
||||
conn.hset::<String, u64, u64, i32>(format!("discord:guild_roles:{}", role.guild_id.get()), role.role.id.get(), 1).await?;
|
||||
}
|
||||
Event::RoleUpdate(role) => {
|
||||
// save role
|
||||
conn.hset::<&str, u64, Vec<u8>, i32>("discord:roles", role.role.id.get(), CachedRole{
|
||||
id: role.role.id.get(),
|
||||
name: role.role.name,
|
||||
position: role.role.position as i32,
|
||||
permissions: role.role.permissions.bits(),
|
||||
mentionable: role.role.mentionable,
|
||||
}.encode_to_vec()).await?;
|
||||
}
|
||||
Event::RoleDelete(role) => {
|
||||
// delete role
|
||||
conn.hdel("discord:roles", role.role_id.get()).await?;
|
||||
// update guild-role map
|
||||
conn.hdel(format!("discord:guild_roles:{}", role.guild_id.get()), role.role_id.get()).await?;
|
||||
}
|
||||
Event::ThreadCreate(thread) => {
|
||||
conn.hset::<&str, u64, Vec<u8>, i32>("discord:channels", thread.id.get(), CachedChannel{
|
||||
id: thread.id.get(),
|
||||
r#type: thread.kind as i32,
|
||||
position: thread.position.unwrap_or(0) as i32,
|
||||
name: thread.name.as_deref().map(|v| v.to_string()),
|
||||
guild_id: Some(thread.guild_id.unwrap().get()),
|
||||
parent_id: thread.parent_id.map(|v| v.get()),
|
||||
..Default::default()
|
||||
}.encode_to_vec()).await?;
|
||||
|
||||
// save guild-channel map
|
||||
conn.hset::<String, u64, u64, i32>(format!("discord:guild_channels:{}", thread.guild_id.unwrap().get()), thread.id.get(), 1).await?;
|
||||
// update guild-channel map
|
||||
}
|
||||
Event::ThreadUpdate(thread) => {
|
||||
conn.hset::<&str, u64, Vec<u8>, i32>("discord:channels", thread.id.get(), CachedChannel{
|
||||
id: thread.id.get(),
|
||||
r#type: thread.kind as i32,
|
||||
position: thread.position.unwrap_or(0) as i32,
|
||||
name: thread.name.as_deref().map(|v| v.to_string()),
|
||||
guild_id: Some(thread.guild_id.unwrap().get()),
|
||||
parent_id: thread.parent_id.map(|v| v.get()),
|
||||
..Default::default()
|
||||
}.encode_to_vec()).await?;
|
||||
}
|
||||
Event::ThreadDelete(thread) => {
|
||||
// delete channel
|
||||
conn.hdel("discord:channels", thread.id.get()).await?;
|
||||
// update guild-channel map
|
||||
conn.hdel(format!("discord:guild_channels:{}", thread.guild_id.get()), thread.id.get()).await?;
|
||||
}
|
||||
Event::ThreadListSync(tls) => {
|
||||
// save channels
|
||||
}
|
||||
Event::MessageCreate(message) => {
|
||||
// save last message of channel
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
@ -1 +0,0 @@
|
||||
pub mod cache;
|
@ -1,15 +0,0 @@
|
||||
#!/bin/sh
|
||||
|
||||
notify () {
|
||||
curl $MGMT/notify -d "$1"
|
||||
}
|
||||
|
||||
curl $MGMT/config > pluralkit.conf
|
||||
|
||||
notify "Cluster $NOMAD_ALLOC_INDEX starting"
|
||||
|
||||
export PluralKit__Bot__Cluster__NodeName="pluralkit-$NOMAD_ALLOC_INDEX"
|
||||
|
||||
dotnet bin/PluralKit.Bot.dll
|
||||
|
||||
notify "Cluster $NOMAD_ALLOC_INDEX exited with code $?"
|
Loading…
Reference in New Issue
Block a user