feat: aggregate cluster statistics and send to influx with scheduled tasks worker

This commit is contained in:
spiral 2022-03-14 23:33:22 -04:00
parent 857e20b601
commit 8909330db2
No known key found for this signature in database
GPG Key ID: 244A11E4B0BCF40E
8 changed files with 106 additions and 69 deletions

View File

@ -40,46 +40,6 @@ public static class BotMetrics
Context = "Bot" Context = "Bot"
}; };
public static GaugeOptions MembersTotal => new()
{
Name = "Members total",
MeasurementUnit = Unit.None,
Context = "Bot"
};
public static GaugeOptions MembersOnline => new()
{
Name = "Members online",
MeasurementUnit = Unit.None,
Context = "Bot"
};
public static GaugeOptions Guilds => new()
{
Name = "Guilds",
MeasurementUnit = Unit.None,
Context = "Bot"
};
public static GaugeOptions Channels => new()
{
Name = "Channels",
MeasurementUnit = Unit.None,
Context = "Bot"
};
public static GaugeOptions ShardLatency => new()
{
Name = "Shard Latency",
Context = "Bot"
};
public static GaugeOptions ShardsConnected => new()
{
Name = "Shards Connected",
Context = "Bot",
MeasurementUnit = Unit.Connections
};
public static MeterOptions WebhookCacheMisses => new() public static MeterOptions WebhookCacheMisses => new()
{ {
Name = "Webhook cache misses", Name = "Webhook cache misses",
@ -87,13 +47,6 @@ public static class BotMetrics
MeasurementUnit = Unit.Calls MeasurementUnit = Unit.Calls
}; };
public static GaugeOptions WebhookCacheSize => new()
{
Name = "Webhook Cache Size",
Context = "Bot",
MeasurementUnit = Unit.Items
};
public static TimerOptions WebhookResponseTime => new() public static TimerOptions WebhookResponseTime => new()
{ {
Name = "Webhook Response Time", Name = "Webhook Response Time",

View File

@ -156,7 +156,7 @@ public class Checks
throw new PKSyntaxError("You need to specify a channel."); throw new PKSyntaxError("You need to specify a channel.");
var error = "Channel not found or you do not have permissions to access it."; var error = "Channel not found or you do not have permissions to access it.";
// todo: this breaks if channel is not in cache and bot does not have View Channel permissions // todo: this breaks if channel is not in cache and bot does not have View Channel permissions
var channel = await ctx.MatchChannel(); var channel = await ctx.MatchChannel();
if (channel == null || channel.GuildId == null) if (channel == null || channel.GuildId == null)

View File

@ -4,6 +4,8 @@ using App.Metrics;
using Myriad.Cache; using Myriad.Cache;
using Newtonsoft.Json;
using NodaTime.Extensions; using NodaTime.Extensions;
using PluralKit.Core; using PluralKit.Core;
@ -20,17 +22,20 @@ public class PeriodicStatCollector
private readonly DbConnectionCountHolder _countHolder; private readonly DbConnectionCountHolder _countHolder;
private readonly CpuStatService _cpu; private readonly CpuStatService _cpu;
private readonly BotConfig _botConfig;
private readonly CoreConfig _config;
private readonly ILogger _logger; private readonly ILogger _logger;
private readonly IMetrics _metrics; private readonly IMetrics _metrics;
private readonly ModelRepository _repo; private readonly ModelRepository _repo;
private readonly RedisService _redis;
private readonly WebhookCacheService _webhookCache; private readonly WebhookCacheService _webhookCache;
public PeriodicStatCollector(IMetrics metrics, ILogger logger, WebhookCacheService webhookCache, public PeriodicStatCollector(IMetrics metrics, ILogger logger, WebhookCacheService webhookCache,
DbConnectionCountHolder countHolder, CpuStatService cpu, ModelRepository repo, DbConnectionCountHolder countHolder, CpuStatService cpu, ModelRepository repo,
IDiscordCache cache) BotConfig botConfig, CoreConfig config, RedisService redis, IDiscordCache cache)
{ {
_metrics = metrics; _metrics = metrics;
_webhookCache = webhookCache; _webhookCache = webhookCache;
@ -38,6 +43,9 @@ public class PeriodicStatCollector
_cpu = cpu; _cpu = cpu;
_repo = repo; _repo = repo;
_cache = cache; _cache = cache;
_botConfig = botConfig;
_config = config;
_redis = redis;
_logger = logger.ForContext<PeriodicStatCollector>(); _logger = logger.ForContext<PeriodicStatCollector>();
} }
@ -59,19 +67,19 @@ public class PeriodicStatCollector
channelCount++; channelCount++;
} }
_metrics.Measure.Gauge.SetValue(BotMetrics.Guilds, guildCount); if (_config.UseRedisMetrics)
_metrics.Measure.Gauge.SetValue(BotMetrics.Channels, channelCount); {
var db = _redis.Connection.GetDatabase();
// Aggregate DB stats await db.HashSetAsync("pluralkit:cluster_stats", new StackExchange.Redis.HashEntry[] {
// just fetching from database here - actual updating of the data is done in PluralKit.ScheduledTasks new(_botConfig.Cluster.NodeIndex, JsonConvert.SerializeObject(new ClusterMetricInfo
// if you're not running ScheduledTasks and want up-to-date counts, uncomment the following line: {
// await _repo.UpdateStats(); GuildCount = guildCount,
var counts = await _repo.GetStats(); ChannelCount = channelCount,
_metrics.Measure.Gauge.SetValue(CoreMetrics.SystemCount, counts.SystemCount); DatabaseConnectionCount = _countHolder.ConnectionCount,
_metrics.Measure.Gauge.SetValue(CoreMetrics.MemberCount, counts.MemberCount); WebhookCacheSize = _webhookCache.CacheSize,
_metrics.Measure.Gauge.SetValue(CoreMetrics.GroupCount, counts.GroupCount); })),
_metrics.Measure.Gauge.SetValue(CoreMetrics.SwitchCount, counts.SwitchCount); });
_metrics.Measure.Gauge.SetValue(CoreMetrics.MessageCount, counts.MessageCount); }
// Process info // Process info
var process = Process.GetCurrentProcess(); var process = Process.GetCurrentProcess();
@ -82,12 +90,6 @@ public class PeriodicStatCollector
_metrics.Measure.Gauge.SetValue(CoreMetrics.ProcessHandles, process.HandleCount); _metrics.Measure.Gauge.SetValue(CoreMetrics.ProcessHandles, process.HandleCount);
_metrics.Measure.Gauge.SetValue(CoreMetrics.CpuUsage, await _cpu.EstimateCpuUsage()); _metrics.Measure.Gauge.SetValue(CoreMetrics.CpuUsage, await _cpu.EstimateCpuUsage());
// Database info
_metrics.Measure.Gauge.SetValue(CoreMetrics.DatabaseConnections, _countHolder.ConnectionCount);
// Other shiz
_metrics.Measure.Gauge.SetValue(BotMetrics.WebhookCacheSize, _webhookCache.CacheSize);
stopwatch.Stop(); stopwatch.Stop();
_logger.Debug("Updated metrics in {Time}", stopwatch.ElapsedDuration()); _logger.Debug("Updated metrics in {Time}", stopwatch.ElapsedDuration());
} }

View File

@ -6,6 +6,7 @@ public class CoreConfig
{ {
public string Database { get; set; } public string Database { get; set; }
public string RedisAddr { get; set; } public string RedisAddr { get; set; }
public bool UseRedisMetrics { get; set; } = false;
public string SentryUrl { get; set; } public string SentryUrl { get; set; }
public string InfluxUrl { get; set; } public string InfluxUrl { get; set; }
public string InfluxDb { get; set; } public string InfluxDb { get; set; }

View File

@ -102,4 +102,12 @@ public static class CoreMetrics
MeasurementUnit = Unit.Connections, MeasurementUnit = Unit.Connections,
Context = "Database" Context = "Database"
}; };
}
public record ClusterMetricInfo
{
public int GuildCount;
public int ChannelCount;
public int DatabaseConnectionCount;
public int WebhookCacheSize;
} }

View File

@ -0,0 +1,26 @@
using App.Metrics;
using App.Metrics.Gauge;
public static class Metrics
{
public static GaugeOptions Guilds => new()
{
Name = "Guilds",
MeasurementUnit = Unit.None,
Context = "Bot"
};
public static GaugeOptions Channels => new()
{
Name = "Channels",
MeasurementUnit = Unit.None,
Context = "Bot"
};
public static GaugeOptions WebhookCacheSize => new()
{
Name = "Webhook Cache Size",
Context = "Bot",
MeasurementUnit = Unit.Items
};
}

View File

@ -19,6 +19,11 @@ internal class Startup
await BuildInfoService.LoadVersion(); await BuildInfoService.LoadVersion();
var services = BuildContainer(config); var services = BuildContainer(config);
var cfg = services.Resolve<CoreConfig>();
if (cfg.UseRedisMetrics)
await services.Resolve<RedisService>().InitAsync(cfg);
services.Resolve<TaskHandler>().Run(); services.Resolve<TaskHandler>().Run();
await Task.Delay(-1); await Task.Delay(-1);

View File

@ -1,11 +1,16 @@
using System; using System;
using System.Linq;
using System.Diagnostics; using System.Diagnostics;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using App.Metrics;
using NodaTime; using NodaTime;
using NodaTime.Extensions; using NodaTime.Extensions;
using Newtonsoft.Json;
using PluralKit.Core; using PluralKit.Core;
using Serilog; using Serilog;
@ -16,16 +21,23 @@ public class TaskHandler
{ {
private static readonly Duration CommandMessageRetention = Duration.FromHours(24); private static readonly Duration CommandMessageRetention = Duration.FromHours(24);
private readonly IDatabase _db; private readonly IDatabase _db;
private readonly RedisService _redis;
private readonly bool _useRedisMetrics;
private readonly ILogger _logger; private readonly ILogger _logger;
private readonly IMetrics _metrics;
private readonly ModelRepository _repo; private readonly ModelRepository _repo;
private Timer _periodicTask; private Timer _periodicTask;
public TaskHandler(ILogger logger, IDatabase db, ModelRepository repo) public TaskHandler(ILogger logger, IMetrics metrics, CoreConfig config, IDatabase db, RedisService redis, ModelRepository repo)
{ {
_logger = logger; _logger = logger;
_metrics = metrics;
_db = db; _db = db;
_redis = redis;
_repo = repo; _repo = repo;
_useRedisMetrics = config.UseRedisMetrics;
} }
public void Run() public void Run()
@ -49,6 +61,10 @@ public class TaskHandler
_logger.Information("Updating database stats..."); _logger.Information("Updating database stats...");
await _repo.UpdateStats(); await _repo.UpdateStats();
// Collect bot cluster statistics from Redis (if it's enabled)
if (_useRedisMetrics)
await CollectBotStats();
// Clean up message cache in postgres // Clean up message cache in postgres
await CleanupOldMessages(); await CleanupOldMessages();
@ -56,6 +72,32 @@ public class TaskHandler
_logger.Information("Ran scheduled tasks in {Time}", stopwatch.ElapsedDuration()); _logger.Information("Ran scheduled tasks in {Time}", stopwatch.ElapsedDuration());
} }
private async Task CollectBotStats()
{
var redisStats = await _redis.Connection.GetDatabase().HashGetAllAsync("pluralkit:cluster_stats");
var stats = redisStats.Select(v => JsonConvert.DeserializeObject<ClusterMetricInfo>(v.Value));
_metrics.Measure.Gauge.SetValue(Metrics.Guilds, stats.Sum(x => x.GuildCount));
_metrics.Measure.Gauge.SetValue(Metrics.Channels, stats.Sum(x => x.ChannelCount));
// Aggregate DB stats
// just fetching from database here - actual updating of the data is done elsewiere
var counts = await _repo.GetStats();
_metrics.Measure.Gauge.SetValue(CoreMetrics.SystemCount, counts.SystemCount);
_metrics.Measure.Gauge.SetValue(CoreMetrics.MemberCount, counts.MemberCount);
_metrics.Measure.Gauge.SetValue(CoreMetrics.GroupCount, counts.GroupCount);
_metrics.Measure.Gauge.SetValue(CoreMetrics.SwitchCount, counts.SwitchCount);
_metrics.Measure.Gauge.SetValue(CoreMetrics.MessageCount, counts.MessageCount);
// Database info
// this is pretty much always inaccurate but oh well
_metrics.Measure.Gauge.SetValue(CoreMetrics.DatabaseConnections, stats.Sum(x => x.DatabaseConnectionCount));
// Other shiz
_metrics.Measure.Gauge.SetValue(Metrics.WebhookCacheSize, stats.Sum(x => x.WebhookCacheSize));
}
private async Task CleanupOldMessages() private async Task CleanupOldMessages()
{ {
var deleteThresholdInstant = SystemClock.Instance.GetCurrentInstant() - CommandMessageRetention; var deleteThresholdInstant = SystemClock.Instance.GetCurrentInstant() - CommandMessageRetention;