Add InfluxDB/Grafana monitoring
This commit is contained in:
@@ -1 +1 @@
|
||||
from . import commands, db, proxy
|
||||
from . import commands, db, proxy, stats
|
@@ -1,6 +1,8 @@
|
||||
from datetime import datetime
|
||||
import logging
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
|
||||
import discord
|
||||
|
||||
@@ -38,7 +40,7 @@ async def on_message(message):
|
||||
# Split into args. shlex sucks so we don't bother with quotes
|
||||
args = message.content.split(" ")
|
||||
|
||||
from pluralkit import proxy, utils
|
||||
from pluralkit import proxy, utils, stats
|
||||
|
||||
command_items = utils.command_map.items()
|
||||
command_items = sorted(command_items, key=lambda x: len(x[0]), reverse=True)
|
||||
@@ -54,7 +56,14 @@ async def on_message(message):
|
||||
args = []
|
||||
|
||||
async with client.pool.acquire() as conn:
|
||||
time_before = time.perf_counter()
|
||||
await func(conn, message, args)
|
||||
time_after = time.perf_counter()
|
||||
|
||||
# Report command time stats
|
||||
execution_time = time_after - time_before
|
||||
response_time = (datetime.now() - message.timestamp).total_seconds()
|
||||
await stats.report_command(command, execution_time, response_time)
|
||||
return
|
||||
|
||||
# Try doing proxy parsing
|
||||
@@ -82,7 +91,7 @@ async def on_socket_raw_receive(msg):
|
||||
pass
|
||||
|
||||
async def run():
|
||||
from pluralkit import db
|
||||
from pluralkit import db, stats
|
||||
try:
|
||||
logger.info("Connecting to database...")
|
||||
pool = await db.connect()
|
||||
@@ -92,6 +101,7 @@ async def run():
|
||||
await db.create_tables(conn)
|
||||
|
||||
logger.info("Connecting to InfluxDB...")
|
||||
await stats.connect()
|
||||
|
||||
client.pool = pool
|
||||
logger.info("Connecting to Discord...")
|
||||
|
@@ -3,9 +3,9 @@ import time
|
||||
import asyncpg
|
||||
import asyncpg.exceptions
|
||||
|
||||
from pluralkit import stats
|
||||
from pluralkit.bot import logger
|
||||
|
||||
|
||||
async def connect():
|
||||
while True:
|
||||
try:
|
||||
@@ -17,11 +17,17 @@ async def connect():
|
||||
def db_wrap(func):
|
||||
async def inner(*args, **kwargs):
|
||||
before = time.perf_counter()
|
||||
res = await func(*args, **kwargs)
|
||||
after = time.perf_counter()
|
||||
try:
|
||||
res = await func(*args, **kwargs)
|
||||
after = time.perf_counter()
|
||||
|
||||
logger.debug(" - DB call {} took {:.2f} ms".format(func.__name__, (after - before) * 1000))
|
||||
return res
|
||||
logger.debug(" - DB call {} took {:.2f} ms".format(func.__name__, (after - before) * 1000))
|
||||
await stats.report_db_query(func.__name__, after - before, True)
|
||||
|
||||
return res
|
||||
except asyncpg.exceptions.PostgresError:
|
||||
await stats.report_db_query(func.__name__, time.perf_counter() - before, False)
|
||||
logger.exception("Error from database query {}".format(func.__name__))
|
||||
return inner
|
||||
|
||||
@db_wrap
|
||||
@@ -223,6 +229,14 @@ async def update_server(conn, server_id: str, logging_channel_id: str):
|
||||
logger.debug("Updating server settings (id={}, log_channel={})".format(server_id, logging_channel_id))
|
||||
await conn.execute("insert into servers (id, log_channel) values ($1, $2) on conflict (id) do update set log_channel = $2", int(server_id), logging_channel_id)
|
||||
|
||||
@db_wrap
|
||||
async def member_count(conn):
|
||||
return await conn.fetchval("select count(*) from members")
|
||||
|
||||
@db_wrap
|
||||
async def system_count(conn):
|
||||
return await conn.fetchval("select count(*) from systems")
|
||||
|
||||
async def create_tables(conn):
|
||||
await conn.execute("""create table if not exists systems (
|
||||
id serial primary key,
|
||||
|
@@ -6,7 +6,7 @@ import time
|
||||
import aiohttp
|
||||
import discord
|
||||
|
||||
from pluralkit import db
|
||||
from pluralkit import db, stats
|
||||
from pluralkit.bot import client, logger
|
||||
|
||||
def make_log_embed(hook_message, member, channel_name):
|
||||
@@ -98,20 +98,28 @@ async def send_hook_message(member, hook_id, hook_token, text=None, image_url=No
|
||||
fd.add_field("file", image_resp.content, content_type=image_resp.content_type, filename=image_resp.url.name)
|
||||
|
||||
# Send the actual webhook request, and wait for a response
|
||||
async with session.post("https://discordapp.com/api/v6/webhooks/{}/{}?wait=true".format(hook_id, hook_token),
|
||||
data=fd,
|
||||
headers=req_headers) as resp:
|
||||
if resp.status == 200:
|
||||
resp_data = await resp.json()
|
||||
# Make a fake message object for passing on - this is slightly broken but works for most things
|
||||
msg = discord.Message(reactions=[], **resp_data)
|
||||
time_before = time.perf_counter()
|
||||
try:
|
||||
async with session.post("https://discordapp.com/api/v6/webhooks/{}/{}?wait=true".format(hook_id, hook_token),
|
||||
data=fd,
|
||||
headers=req_headers) as resp:
|
||||
if resp.status == 200:
|
||||
resp_data = await resp.json()
|
||||
|
||||
# Make sure it's added to the client's message cache - otherwise events r
|
||||
#client.messages.append(msg)
|
||||
return msg
|
||||
else:
|
||||
# Fake a Discord exception, also because #yolo
|
||||
raise discord.HTTPException(resp, await resp.text())
|
||||
# Make a fake message object for passing on - this is slightly broken but works for most things
|
||||
msg = discord.Message(reactions=[], **resp_data)
|
||||
|
||||
# Report to stats
|
||||
await stats.report_webhook(time.perf_counter() - time_before, True)
|
||||
return msg
|
||||
else:
|
||||
await stats.report_webhook(time.perf_counter() - time_before, False)
|
||||
|
||||
# Fake a Discord exception, also because #yolo
|
||||
raise discord.HTTPException(resp, await resp.text())
|
||||
except aiohttp.ClientResponseError:
|
||||
await stats.report_webhook(time.perf_counter() - time_before, False)
|
||||
logger.exception("Error while sending webhook message")
|
||||
|
||||
|
||||
async def proxy_message(conn, member, trigger_message, inner):
|
||||
|
29
bot/pluralkit/stats.py
Normal file
29
bot/pluralkit/stats.py
Normal file
@@ -0,0 +1,29 @@
|
||||
from aioinflux import InfluxDBClient
|
||||
|
||||
from pluralkit.bot import logger
|
||||
|
||||
client = None
|
||||
async def connect():
|
||||
global client
|
||||
client = InfluxDBClient(host="influx", db="pluralkit")
|
||||
await client.create_database(db="pluralkit")
|
||||
|
||||
async def report_db_query(query_name, time, success):
|
||||
await client.write({
|
||||
"measurement": "database_query",
|
||||
"tags": {"query": query_name},
|
||||
"fields": {"response_time": time, "success": int(success)}
|
||||
})
|
||||
|
||||
async def report_command(command_name, execution_time, response_time):
|
||||
await client.write({
|
||||
"measurement": "command",
|
||||
"tags": {"command": command_name},
|
||||
"fields": {"execution_time": execution_time, "response_time": response_time}
|
||||
})
|
||||
|
||||
async def report_webhook(time, success):
|
||||
await client.write({
|
||||
"measurement": "webhook",
|
||||
"fields": {"response_time": time, "success": int(success)}
|
||||
})
|
Reference in New Issue
Block a user