Move stats stuff into its own class. Closes #1
This commit is contained in:
parent
cd0af5321c
commit
a130e2215a
@ -7,8 +7,9 @@ from datetime import datetime
|
||||
|
||||
import discord
|
||||
|
||||
from pluralkit import db, stats
|
||||
from pluralkit import db
|
||||
from pluralkit.bot import channel_logger, commands, proxy
|
||||
from pluralkit.stats import InfluxStatCollector, NullStatCollector
|
||||
|
||||
logging.basicConfig(level=logging.INFO, format="[%(asctime)s] [%(name)s] [%(levelname)s] %(message)s")
|
||||
# logging.getLogger("pluralkit").setLevel(logging.DEBUG)
|
||||
@ -24,8 +25,10 @@ class PluralKitBot:
|
||||
self.client.event(self.on_message)
|
||||
self.client.event(self.on_socket_raw_receive)
|
||||
|
||||
self.stats = NullStatCollector()
|
||||
|
||||
self.channel_logger = channel_logger.ChannelLogger(self.client)
|
||||
self.proxy = proxy.Proxy(self.client, token, self.channel_logger)
|
||||
self.proxy = proxy.Proxy(self.client, token, self.channel_logger, self.stats)
|
||||
|
||||
async def on_error(self, evt, *args, **kwargs):
|
||||
self.logger.exception("Error while handling event {} with arguments {}:".format(evt, args))
|
||||
@ -93,7 +96,7 @@ class PluralKitBot:
|
||||
# Report command time stats
|
||||
execution_time = time_after - time_before
|
||||
response_time = (datetime.now() - message.timestamp).total_seconds()
|
||||
await stats.report_command(command_name, execution_time, response_time)
|
||||
await self.stats.report_command(command_name, execution_time, response_time)
|
||||
|
||||
return True
|
||||
|
||||
@ -105,8 +108,7 @@ class PluralKitBot:
|
||||
async def periodical_stat_timer(self, pool):
|
||||
async with pool.acquire() as conn:
|
||||
while True:
|
||||
from pluralkit import stats
|
||||
await stats.report_periodical_stats(conn)
|
||||
await self.stats.report_periodical_stats(conn)
|
||||
await asyncio.sleep(30)
|
||||
|
||||
async def run(self):
|
||||
@ -125,8 +127,8 @@ class PluralKitBot:
|
||||
await db.create_tables(conn)
|
||||
|
||||
self.logger.info("Connecting to InfluxDB...")
|
||||
await stats.connect()
|
||||
|
||||
self.stats = await InfluxStatCollector.connect()
|
||||
|
||||
self.logger.info("Starting periodical stat reporting...")
|
||||
asyncio.get_event_loop().create_task(self.periodical_stat_timer(self.pool))
|
||||
|
||||
|
@ -7,8 +7,9 @@ from typing import List, Optional
|
||||
import aiohttp
|
||||
import discord
|
||||
|
||||
from pluralkit import db, stats
|
||||
from pluralkit import db
|
||||
from pluralkit.bot import channel_logger, utils
|
||||
from pluralkit.stats import StatCollector
|
||||
|
||||
logger = logging.getLogger("pluralkit.bot.proxy")
|
||||
|
||||
@ -86,12 +87,13 @@ class DeletionPermissionError(Exception):
|
||||
|
||||
|
||||
class Proxy:
|
||||
def __init__(self, client: discord.Client, token: str, logger: channel_logger.ChannelLogger):
|
||||
def __init__(self, client: discord.Client, token: str, logger: channel_logger.ChannelLogger, stats: StatCollector):
|
||||
self.logger = logging.getLogger("pluralkit.bot.proxy")
|
||||
self.session = aiohttp.ClientSession()
|
||||
self.client = client
|
||||
self.token = token
|
||||
self.channel_logger = logger
|
||||
self.stats = stats
|
||||
|
||||
async def save_channel_webhook(self, conn, channel: discord.Channel, id: str, token: str) -> (str, str):
|
||||
await db.add_webhook(conn, channel.id, id, token)
|
||||
@ -171,7 +173,7 @@ class Proxy:
|
||||
message = await resp.json()
|
||||
|
||||
# Report webhook stats to Influx
|
||||
await stats.report_webhook(time.perf_counter() - time_before, True)
|
||||
await self.stats.report_webhook(time.perf_counter() - time_before, True)
|
||||
|
||||
await db.add_message(conn, message["id"], message["channel_id"], member.id, original_message.author.id,
|
||||
text or "")
|
||||
@ -211,7 +213,7 @@ class Proxy:
|
||||
message_id=message["id"])
|
||||
elif resp.status == 404 and not has_already_retried:
|
||||
# Report webhook stats to Influx
|
||||
await stats.report_webhook(time.perf_counter() - time_before, False)
|
||||
await self.stats.report_webhook(time.perf_counter() - time_before, False)
|
||||
|
||||
# Webhook doesn't exist. Delete it from the DB, create, and add a new one
|
||||
self.logger.warning("Webhook registered in DB doesn't exist, deleting hook from DB, re-adding, and trying again (channel={}, hook={})".format(original_message.channel.id, hook_id))
|
||||
@ -222,7 +224,7 @@ class Proxy:
|
||||
return await self.do_proxy_message(conn, member, original_message, text, attachment_url, has_already_retried=True)
|
||||
else:
|
||||
# Report webhook stats to Influx
|
||||
await stats.report_webhook(time.perf_counter() - time_before, False)
|
||||
await self.stats.report_webhook(time.perf_counter() - time_before, False)
|
||||
|
||||
raise discord.HTTPException(resp, await resp.text())
|
||||
|
||||
|
@ -26,11 +26,12 @@ def db_wrap(func):
|
||||
after = time.perf_counter()
|
||||
|
||||
logger.debug(" - DB call {} took {:.2f} ms".format(func.__name__, (after - before) * 1000))
|
||||
await stats.report_db_query(func.__name__, after - before, True)
|
||||
# TODO: find some way to give this func access to the bot's stats object
|
||||
#await stats.report_db_query(func.__name__, after - before, True)
|
||||
|
||||
return res
|
||||
except asyncpg.exceptions.PostgresError:
|
||||
await stats.report_db_query(func.__name__, time.perf_counter() - before, False)
|
||||
#await stats.report_db_query(func.__name__, time.perf_counter() - before, False)
|
||||
logger.exception("Error from database query {}".format(func.__name__))
|
||||
return inner
|
||||
|
||||
|
@ -1,57 +1,69 @@
|
||||
from aioinflux import InfluxDBClient
|
||||
|
||||
client = None
|
||||
async def connect():
|
||||
global client
|
||||
client = InfluxDBClient(host="influx", db="pluralkit")
|
||||
await client.create_database(db="pluralkit")
|
||||
|
||||
async def report_db_query(query_name, time, success):
|
||||
if not client:
|
||||
return
|
||||
class StatCollector:
|
||||
async def report_db_query(self, query_name, time, success):
|
||||
pass
|
||||
|
||||
await client.write({
|
||||
"measurement": "database_query",
|
||||
"tags": {"query": query_name},
|
||||
"fields": {"response_time": time, "success": int(success)}
|
||||
})
|
||||
async def report_command(self, command_name, execution_time, response_time):
|
||||
pass
|
||||
|
||||
async def report_command(command_name, execution_time, response_time):
|
||||
if not client:
|
||||
return
|
||||
async def report_webhook(self, time, success):
|
||||
pass
|
||||
|
||||
await client.write({
|
||||
"measurement": "command",
|
||||
"tags": {"command": command_name},
|
||||
"fields": {"execution_time": execution_time, "response_time": response_time}
|
||||
})
|
||||
async def report_periodical_stats(self, conn):
|
||||
pass
|
||||
|
||||
async def report_webhook(time, success):
|
||||
if not client:
|
||||
return
|
||||
|
||||
await client.write({
|
||||
"measurement": "webhook",
|
||||
"fields": {"response_time": time, "success": int(success)}
|
||||
})
|
||||
class NullStatCollector(StatCollector):
|
||||
pass
|
||||
|
||||
async def report_periodical_stats(conn):
|
||||
if not client:
|
||||
return
|
||||
|
||||
from pluralkit import db
|
||||
class InfluxStatCollector(StatCollector):
|
||||
@staticmethod
|
||||
async def connect():
|
||||
client = InfluxDBClient(host="influx", db="pluralkit")
|
||||
await client.create_database(db="pluralkit")
|
||||
|
||||
systems = await db.system_count(conn)
|
||||
members = await db.member_count(conn)
|
||||
messages = await db.message_count(conn)
|
||||
accounts = await db.account_count(conn)
|
||||
return InfluxStatCollector(client)
|
||||
|
||||
await client.write({
|
||||
"measurement": "stats",
|
||||
"fields": {
|
||||
"systems": systems,
|
||||
"members": members,
|
||||
"messages": messages,
|
||||
"accounts": accounts
|
||||
}
|
||||
})
|
||||
def __init__(self, client):
|
||||
self.client = client
|
||||
|
||||
async def report_db_query(self, query_name, time, success):
|
||||
await self.client.write({
|
||||
"measurement": "database_query",
|
||||
"tags": {"query": query_name},
|
||||
"fields": {"response_time": time, "success": int(success)}
|
||||
})
|
||||
|
||||
async def report_command(self, command_name, execution_time, response_time):
|
||||
await self.client.write({
|
||||
"measurement": "command",
|
||||
"tags": {"command": command_name},
|
||||
"fields": {"execution_time": execution_time, "response_time": response_time}
|
||||
})
|
||||
|
||||
async def report_webhook(self, time, success):
|
||||
await self.client.write({
|
||||
"measurement": "webhook",
|
||||
"fields": {"response_time": time, "success": int(success)}
|
||||
})
|
||||
|
||||
async def report_periodical_stats(self, conn):
|
||||
from pluralkit import db
|
||||
|
||||
systems = await db.system_count(conn)
|
||||
members = await db.member_count(conn)
|
||||
messages = await db.message_count(conn)
|
||||
accounts = await db.account_count(conn)
|
||||
|
||||
await self.client.write({
|
||||
"measurement": "stats",
|
||||
"fields": {
|
||||
"systems": systems,
|
||||
"members": members,
|
||||
"messages": messages,
|
||||
"accounts": accounts
|
||||
}
|
||||
})
|
||||
|
Loading…
Reference in New Issue
Block a user