feat(gateway): split out event handler to separate file, remove twilight cache

This commit is contained in:
spiral
2022-04-13 08:48:06 -04:00
parent c2094e3b7a
commit 6be8dd0773
4 changed files with 119 additions and 207 deletions

108
gateway/src/evt.rs Normal file
View File

@@ -0,0 +1,108 @@
use deadpool_postgres::Pool;
use redis::AsyncCommands;
use twilight_model::gateway::event::GatewayEventDeserializer;
use std::sync::Arc;
use tracing::info;
use twilight_gateway::Event;
use twilight_http::Client as HttpClient;
lazy_static::lazy_static! {
static ref ALLOWED_EVENTS: Vec<&'static str> = [
"INTERACTION_CREATE",
"MESSAGE_CREATE",
"MESSAGE_DELETE",
"MESSAGE_DELETE_BULK",
"MESSAGE_UPDATE",
"MESSAGE_REACTION_ADD",
].to_vec();
}
pub async fn handle_event<'a>(
shard_id: u64,
event: Event,
http: Arc<HttpClient>,
_db: Pool,
rconn: redis::Client
) -> anyhow::Result<()> {
match event {
Event::GatewayInvalidateSession(resumable) => {
info!("shard {} session invalidated, resumable? {}", shard_id, resumable);
}
Event::ShardConnected(_) => {
info!("shard {} connected", shard_id);
}
Event::ShardDisconnected(info) => {
info!("shard {} disconnected, code: {:?}, reason: {:?}", shard_id, info.code, info.reason);
}
Event::ShardPayload(payload) => {
let deserializer = GatewayEventDeserializer::from_json(std::str::from_utf8(&payload.bytes)?).unwrap();
if deserializer.op() == 0 && ALLOWED_EVENTS.contains(&deserializer.event_type_ref().unwrap()) {
let mut conn = rconn.get_async_connection().await?;
conn.publish::<&str, Vec<u8>, i32>("evt", payload.bytes).await?;
}
}
Event::MessageCreate(msg) => {
if msg.content == "pkt;test" {
// let message_context = db::get_message_context(
// &db,
// msg.author.id.get(),
// msg.guild_id.map(|x| x.get()).unwrap_or(0),
// msg.channel_id.get(),
// )
// .await?;
// let content = format!("message context:\n```\n{:#?}\n```", message_context);
// http.create_message(msg.channel_id)
// .reply(msg.id)
// .content(&content)?
// .exec()
// .await?;
// let proxy_members = db::get_proxy_members(
// &db,
// msg.author.id.get(),
// msg.guild_id.map(|x| x.get()).unwrap_or(0),
// )
// .await?;
// let content = format!("proxy members:\n```\n{:#?}\n```", proxy_members);
// info!("{}", content);
// http.create_message(msg.channel_id)
// .reply(msg.id)
// .content(&content)?
// .exec()
// .await?;
// let cache_stats = cache.stats();
// let pid = unsafe { libc::getpid() };
// let pagesize = {
// unsafe {
// libc::sysconf(libc::_SC_PAGESIZE)
// }
// };
// let p = procfs::process::Process::new(pid)?;
// let content = format!(
// "[rust]\nguilds:{}\nchannels:{}\nroles:{}\nusers:{}\nmembers:{}\n\nmemory usage: {}",
// cache_stats.guilds(),
// cache_stats.channels(),
// cache_stats.roles(),
// cache_stats.users(),
// cache_stats.members(),
// p.stat.rss * pagesize
// );
// http.create_message(msg.channel_id)
// .reply(msg.id)
// .content(&content)?
// .exec()
// .await?;
}
}
_ => {}
}
Ok(())
}

View File

@@ -1,17 +1,16 @@
use deadpool_postgres::Pool;
use futures::StreamExt;
use redis::AsyncCommands;
use std::{sync::Arc, env};
use tracing::{error, info, Level};
use twilight_cache_inmemory::{InMemoryCache, ResourceType};
use twilight_gateway::{
cluster::{Events, ShardScheme},
Cluster, Event, EventTypeFlags, Intents,
Cluster, EventTypeFlags, Intents,
};
use twilight_http::Client as HttpClient;
mod config;
mod evt;
mod db;
mod util;
@@ -25,10 +24,10 @@ async fn main() -> anyhow::Result<()> {
let http = Arc::new(HttpClient::new(cfg.token.clone()));
let rconn = redis::Client::open(cfg.redis_addr.clone()).unwrap();
let (_cluster, events) = init_gateway(&cfg, rconn.clone()).await?;
let cache = init_cache();
// let cache = init_cache();
let db = db::init_db(&cfg).await?;
run(http, events, cache, db, rconn).await?;
run(http, events, db, rconn).await?;
Ok(())
}
@@ -36,25 +35,22 @@ async fn main() -> anyhow::Result<()> {
async fn run(
http: Arc<HttpClient>,
mut events: Events,
cache: Arc<InMemoryCache>,
db: Pool,
rconn: redis::Client,
) -> anyhow::Result<()> {
while let Some((shard_id, event)) = events.next().await {
cache.update(&event);
// cache.update(&event);
let http_cloned = http.clone();
let cache_cloned = cache.clone();
let db_cloned = db.clone();
let rconn_cloned = rconn.clone();
tokio::spawn(async move {
let result = handle_event(
let result = evt::handle_event(
shard_id,
event,
http_cloned,
cache_cloned,
db_cloned,
rconn_cloned
)
@@ -68,93 +64,6 @@ async fn run(
Ok(())
}
async fn handle_event<'a>(
shard_id: u64,
event: Event,
http: Arc<HttpClient>,
cache: Arc<InMemoryCache>,
_db: Pool,
rconn: redis::Client
) -> anyhow::Result<()> {
match event {
Event::GatewayInvalidateSession(resumable) => {
info!("shard {} session invalidated, resumable? {}", shard_id, resumable);
}
Event::ShardConnected(_) => {
info!("shard {} connected", shard_id);
}
Event::ShardDisconnected(info) => {
info!("shard {} disconnected, code: {:?}, reason: {:?}", shard_id, info.code, info.reason);
}
Event::ShardPayload(payload) => {
let mut conn = rconn.get_async_connection().await?;
conn.publish::<&str, Vec<u8>, i32>("evt", payload.bytes).await?;
}
Event::MessageCreate(msg) => {
if msg.content == "pkt;test" {
// let message_context = db::get_message_context(
// &db,
// msg.author.id.get(),
// msg.guild_id.map(|x| x.get()).unwrap_or(0),
// msg.channel_id.get(),
// )
// .await?;
// let content = format!("message context:\n```\n{:#?}\n```", message_context);
// http.create_message(msg.channel_id)
// .reply(msg.id)
// .content(&content)?
// .exec()
// .await?;
// let proxy_members = db::get_proxy_members(
// &db,
// msg.author.id.get(),
// msg.guild_id.map(|x| x.get()).unwrap_or(0),
// )
// .await?;
// let content = format!("proxy members:\n```\n{:#?}\n```", proxy_members);
// info!("{}", content);
// http.create_message(msg.channel_id)
// .reply(msg.id)
// .content(&content)?
// .exec()
// .await?;
let cache_stats = cache.stats();
let pid = unsafe { libc::getpid() };
let pagesize = {
unsafe {
libc::sysconf(libc::_SC_PAGESIZE)
}
};
let p = procfs::process::Process::new(pid)?;
let content = format!(
"[rust]\nguilds:{}\nchannels:{}\nroles:{}\nusers:{}\nmembers:{}\n\nmemory usage: {}",
cache_stats.guilds(),
cache_stats.channels(),
cache_stats.roles(),
cache_stats.users(),
cache_stats.members(),
p.stat.rss * pagesize
);
http.create_message(msg.channel_id)
.reply(msg.id)
.content(&content)?
.exec()
.await?;
}
}
_ => {}
}
Ok(())
}
fn init_tracing() {
tracing_subscriber::fmt()
.with_max_level(Level::INFO)
@@ -203,10 +112,7 @@ async fn init_gateway(
| EventTypeFlags::SHARD_PAYLOAD
| EventTypeFlags::SHARD_CONNECTED
| EventTypeFlags::SHARD_DISCONNECTED
| EventTypeFlags::GUILD_CREATE
| EventTypeFlags::CHANNEL_CREATE
| EventTypeFlags::MESSAGE_CREATE
// | EventTypeFlags::MESSAGE_UPDATE
)
.queue(Arc::new(queue))
.build()
@@ -220,16 +126,3 @@ async fn init_gateway(
Ok((cluster, events))
}
fn init_cache() -> Arc<InMemoryCache> {
let cache = InMemoryCache::builder()
.resource_types(
ResourceType::GUILD
| ResourceType::CHANNEL
| ResourceType::ROLE
| ResourceType::USER
// | ResourceType::MEMBER
)
.build();
Arc::new(cache)
}