From 6be8dd07737d9a7460cc5eda96158e796f03a98a Mon Sep 17 00:00:00 2001 From: spiral Date: Wed, 13 Apr 2022 08:48:06 -0400 Subject: [PATCH] feat(gateway): split out event handler to separate file, remove twilight cache --- gateway/Cargo.lock | 97 ++---------------------------------- gateway/Cargo.toml | 2 +- gateway/src/evt.rs | 108 ++++++++++++++++++++++++++++++++++++++++ gateway/src/main.rs | 119 +++----------------------------------------- 4 files changed, 119 insertions(+), 207 deletions(-) create mode 100644 gateway/src/evt.rs diff --git a/gateway/Cargo.lock b/gateway/Cargo.lock index 6f8af8ec..33d3e74e 100644 --- a/gateway/Cargo.lock +++ b/gateway/Cargo.lock @@ -235,17 +235,6 @@ dependencies = [ "subtle", ] -[[package]] -name = "dashmap" -version = "5.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c8858831f7781322e539ea39e72449c46b059638250c14344fec8d0aa6e539c" -dependencies = [ - "cfg-if", - "num_cpus", - "parking_lot 0.12.0", -] - [[package]] name = "deadpool" version = "0.9.2" @@ -819,17 +808,7 @@ checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" dependencies = [ "instant", "lock_api", - "parking_lot_core 0.8.5", -] - -[[package]] -name = "parking_lot" -version = "0.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87f5ec2493a61ac0506c0f4199f99070cbe83857b0337006a30f3e6719b8ef58" -dependencies = [ - "lock_api", - "parking_lot_core 0.9.2", + "parking_lot_core", ] [[package]] @@ -846,19 +825,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "parking_lot_core" -version = "0.9.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "995f667a6c822200b0433ac218e05582f0e2efa1b922a3fd2fbaadc5f87bab37" -dependencies = [ - "cfg-if", - "libc", - "redox_syscall", - "smallvec", - "windows-sys", -] - [[package]] name = "percent-encoding" version = "2.1.0" @@ -910,6 +876,7 @@ dependencies = [ "deadpool", "deadpool-postgres", "futures", + "lazy_static", "libc", "postgres-types", "procfs", @@ -920,7 +887,6 @@ dependencies = [ "tokio-stream", "tracing", "tracing-subscriber", - "twilight-cache-inmemory", "twilight-gateway", "twilight-gateway-queue", "twilight-http", @@ -1434,7 +1400,7 @@ dependencies = [ "mio", "num_cpus", "once_cell", - "parking_lot 0.11.2", + "parking_lot", "pin-project-lite", "signal-hook-registry", "tokio-macros", @@ -1464,7 +1430,7 @@ dependencies = [ "fallible-iterator", "futures", "log", - "parking_lot 0.11.2", + "parking_lot", "percent-encoding", "phf", "pin-project-lite", @@ -1618,18 +1584,6 @@ dependencies = [ "webpki", ] -[[package]] -name = "twilight-cache-inmemory" -version = "0.10.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "644c73ca822657936e6da14b96c11808d6ec848798d1ea6ecc15aef1fc12a383" -dependencies = [ - "bitflags", - "dashmap", - "serde", - "twilight-model", -] - [[package]] name = "twilight-gateway" version = "0.10.1" @@ -1898,46 +1852,3 @@ name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" - -[[package]] -name = "windows-sys" -version = "0.34.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5acdd78cb4ba54c0045ac14f62d8f94a03d10047904ae2a40afa1e99d8f70825" -dependencies = [ - "windows_aarch64_msvc", - "windows_i686_gnu", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_msvc", -] - -[[package]] -name = "windows_aarch64_msvc" -version = "0.34.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17cffbe740121affb56fad0fc0e421804adf0ae00891205213b5cecd30db881d" - -[[package]] -name = "windows_i686_gnu" -version = "0.34.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2564fde759adb79129d9b4f54be42b32c89970c18ebf93124ca8870a498688ed" - -[[package]] -name = "windows_i686_msvc" -version = "0.34.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cd9d32ba70453522332c14d38814bceeb747d80b3958676007acadd7e166956" - -[[package]] -name = "windows_x86_64_gnu" -version = "0.34.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cfce6deae227ee8d356d19effc141a509cc503dfd1f850622ec4b0f84428e1f4" - -[[package]] -name = "windows_x86_64_msvc" -version = "0.34.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d19538ccc21819d01deaf88d6a17eae6596a12e9aafdbb97916fb49896d89de9" diff --git a/gateway/Cargo.toml b/gateway/Cargo.toml index 7709f9ad..50611dd3 100644 --- a/gateway/Cargo.toml +++ b/gateway/Cargo.toml @@ -13,12 +13,12 @@ tracing = "0.1" tracing-subscriber = "0.3" tokio = { version = "1", features = ["full"] } tokio-stream = { version = "0.1", features = ["sync"] } +lazy_static = "1.4.0" procfs = "0.12.0" libc = "0.2.122" # Twilight -twilight-cache-inmemory = "0.10.0" twilight-gateway = "0.10.0" twilight-gateway-queue = "0.10.0" twilight-http = "0.10.0" diff --git a/gateway/src/evt.rs b/gateway/src/evt.rs new file mode 100644 index 00000000..e0012bcc --- /dev/null +++ b/gateway/src/evt.rs @@ -0,0 +1,108 @@ +use deadpool_postgres::Pool; +use redis::AsyncCommands; +use twilight_model::gateway::event::GatewayEventDeserializer; +use std::sync::Arc; +use tracing::info; + +use twilight_gateway::Event; +use twilight_http::Client as HttpClient; + +lazy_static::lazy_static! { + static ref ALLOWED_EVENTS: Vec<&'static str> = [ + "INTERACTION_CREATE", + "MESSAGE_CREATE", + "MESSAGE_DELETE", + "MESSAGE_DELETE_BULK", + "MESSAGE_UPDATE", + "MESSAGE_REACTION_ADD", + ].to_vec(); +} + +pub async fn handle_event<'a>( + shard_id: u64, + event: Event, + http: Arc, + _db: Pool, + rconn: redis::Client +) -> anyhow::Result<()> { + match event { + Event::GatewayInvalidateSession(resumable) => { + info!("shard {} session invalidated, resumable? {}", shard_id, resumable); + } + Event::ShardConnected(_) => { + info!("shard {} connected", shard_id); + } + Event::ShardDisconnected(info) => { + info!("shard {} disconnected, code: {:?}, reason: {:?}", shard_id, info.code, info.reason); + } + Event::ShardPayload(payload) => { + let deserializer = GatewayEventDeserializer::from_json(std::str::from_utf8(&payload.bytes)?).unwrap(); + if deserializer.op() == 0 && ALLOWED_EVENTS.contains(&deserializer.event_type_ref().unwrap()) { + let mut conn = rconn.get_async_connection().await?; + conn.publish::<&str, Vec, i32>("evt", payload.bytes).await?; + } + } + Event::MessageCreate(msg) => { + if msg.content == "pkt;test" { + // let message_context = db::get_message_context( + // &db, + // msg.author.id.get(), + // msg.guild_id.map(|x| x.get()).unwrap_or(0), + // msg.channel_id.get(), + // ) + // .await?; + + // let content = format!("message context:\n```\n{:#?}\n```", message_context); + // http.create_message(msg.channel_id) + // .reply(msg.id) + // .content(&content)? + // .exec() + // .await?; + + // let proxy_members = db::get_proxy_members( + // &db, + // msg.author.id.get(), + // msg.guild_id.map(|x| x.get()).unwrap_or(0), + // ) + // .await?; + + // let content = format!("proxy members:\n```\n{:#?}\n```", proxy_members); + // info!("{}", content); + // http.create_message(msg.channel_id) + // .reply(msg.id) + // .content(&content)? + // .exec() + // .await?; + + // let cache_stats = cache.stats(); + + // let pid = unsafe { libc::getpid() }; + // let pagesize = { + // unsafe { + // libc::sysconf(libc::_SC_PAGESIZE) + // } + // }; + + // let p = procfs::process::Process::new(pid)?; + // let content = format!( + // "[rust]\nguilds:{}\nchannels:{}\nroles:{}\nusers:{}\nmembers:{}\n\nmemory usage: {}", + // cache_stats.guilds(), + // cache_stats.channels(), + // cache_stats.roles(), + // cache_stats.users(), + // cache_stats.members(), + // p.stat.rss * pagesize + // ); + + // http.create_message(msg.channel_id) + // .reply(msg.id) + // .content(&content)? + // .exec() + // .await?; + } + } + _ => {} + } + + Ok(()) +} diff --git a/gateway/src/main.rs b/gateway/src/main.rs index e57146f4..c457ed83 100644 --- a/gateway/src/main.rs +++ b/gateway/src/main.rs @@ -1,17 +1,16 @@ use deadpool_postgres::Pool; use futures::StreamExt; -use redis::AsyncCommands; use std::{sync::Arc, env}; use tracing::{error, info, Level}; -use twilight_cache_inmemory::{InMemoryCache, ResourceType}; use twilight_gateway::{ cluster::{Events, ShardScheme}, - Cluster, Event, EventTypeFlags, Intents, + Cluster, EventTypeFlags, Intents, }; use twilight_http::Client as HttpClient; mod config; +mod evt; mod db; mod util; @@ -25,10 +24,10 @@ async fn main() -> anyhow::Result<()> { let http = Arc::new(HttpClient::new(cfg.token.clone())); let rconn = redis::Client::open(cfg.redis_addr.clone()).unwrap(); let (_cluster, events) = init_gateway(&cfg, rconn.clone()).await?; - let cache = init_cache(); + // let cache = init_cache(); let db = db::init_db(&cfg).await?; - run(http, events, cache, db, rconn).await?; + run(http, events, db, rconn).await?; Ok(()) } @@ -36,25 +35,22 @@ async fn main() -> anyhow::Result<()> { async fn run( http: Arc, mut events: Events, - cache: Arc, db: Pool, rconn: redis::Client, ) -> anyhow::Result<()> { while let Some((shard_id, event)) = events.next().await { - cache.update(&event); + // cache.update(&event); let http_cloned = http.clone(); - let cache_cloned = cache.clone(); let db_cloned = db.clone(); let rconn_cloned = rconn.clone(); tokio::spawn(async move { - let result = handle_event( + let result = evt::handle_event( shard_id, event, http_cloned, - cache_cloned, db_cloned, rconn_cloned ) @@ -68,93 +64,6 @@ async fn run( Ok(()) } -async fn handle_event<'a>( - shard_id: u64, - event: Event, - http: Arc, - cache: Arc, - _db: Pool, - rconn: redis::Client -) -> anyhow::Result<()> { - match event { - Event::GatewayInvalidateSession(resumable) => { - info!("shard {} session invalidated, resumable? {}", shard_id, resumable); - } - Event::ShardConnected(_) => { - info!("shard {} connected", shard_id); - } - Event::ShardDisconnected(info) => { - info!("shard {} disconnected, code: {:?}, reason: {:?}", shard_id, info.code, info.reason); - } - Event::ShardPayload(payload) => { - let mut conn = rconn.get_async_connection().await?; - conn.publish::<&str, Vec, i32>("evt", payload.bytes).await?; - } - Event::MessageCreate(msg) => { - if msg.content == "pkt;test" { - // let message_context = db::get_message_context( - // &db, - // msg.author.id.get(), - // msg.guild_id.map(|x| x.get()).unwrap_or(0), - // msg.channel_id.get(), - // ) - // .await?; - - // let content = format!("message context:\n```\n{:#?}\n```", message_context); - // http.create_message(msg.channel_id) - // .reply(msg.id) - // .content(&content)? - // .exec() - // .await?; - - // let proxy_members = db::get_proxy_members( - // &db, - // msg.author.id.get(), - // msg.guild_id.map(|x| x.get()).unwrap_or(0), - // ) - // .await?; - - // let content = format!("proxy members:\n```\n{:#?}\n```", proxy_members); - // info!("{}", content); - // http.create_message(msg.channel_id) - // .reply(msg.id) - // .content(&content)? - // .exec() - // .await?; - - let cache_stats = cache.stats(); - - let pid = unsafe { libc::getpid() }; - let pagesize = { - unsafe { - libc::sysconf(libc::_SC_PAGESIZE) - } - }; - - let p = procfs::process::Process::new(pid)?; - let content = format!( - "[rust]\nguilds:{}\nchannels:{}\nroles:{}\nusers:{}\nmembers:{}\n\nmemory usage: {}", - cache_stats.guilds(), - cache_stats.channels(), - cache_stats.roles(), - cache_stats.users(), - cache_stats.members(), - p.stat.rss * pagesize - ); - - http.create_message(msg.channel_id) - .reply(msg.id) - .content(&content)? - .exec() - .await?; - } - } - _ => {} - } - - Ok(()) -} - fn init_tracing() { tracing_subscriber::fmt() .with_max_level(Level::INFO) @@ -203,10 +112,7 @@ async fn init_gateway( | EventTypeFlags::SHARD_PAYLOAD | EventTypeFlags::SHARD_CONNECTED | EventTypeFlags::SHARD_DISCONNECTED - | EventTypeFlags::GUILD_CREATE - | EventTypeFlags::CHANNEL_CREATE | EventTypeFlags::MESSAGE_CREATE - // | EventTypeFlags::MESSAGE_UPDATE ) .queue(Arc::new(queue)) .build() @@ -220,16 +126,3 @@ async fn init_gateway( Ok((cluster, events)) } - -fn init_cache() -> Arc { - let cache = InMemoryCache::builder() - .resource_types( - ResourceType::GUILD - | ResourceType::CHANNEL - | ResourceType::ROLE - | ResourceType::USER - // | ResourceType::MEMBER - ) - .build(); - Arc::new(cache) -}