From ae9ed0f4eec88861ea31a6112f5ff0cdc8372832 Mon Sep 17 00:00:00 2001 From: Ske Date: Thu, 10 Jun 2021 12:52:47 +0200 Subject: [PATCH] Store stard status in the database --- PluralKit.Bot/Services/ShardInfoService.cs | 50 ++++++++++++++----- PluralKit.Core/Database/Database.cs | 2 +- PluralKit.Core/Database/Migrations/14.sql | 15 ++++++ .../Repository/ModelRepository.Shards.cs | 30 +++++++++++ PluralKit.Core/Models/PKShardInfo.cs | 19 +++++++ 5 files changed, 103 insertions(+), 13 deletions(-) create mode 100644 PluralKit.Core/Database/Migrations/14.sql create mode 100644 PluralKit.Core/Database/Repository/ModelRepository.Shards.cs create mode 100644 PluralKit.Core/Models/PKShardInfo.cs diff --git a/PluralKit.Bot/Services/ShardInfoService.cs b/PluralKit.Bot/Services/ShardInfoService.cs index 5fdc2aa1..7ec05562 100644 --- a/PluralKit.Bot/Services/ShardInfoService.cs +++ b/PluralKit.Bot/Services/ShardInfoService.cs @@ -2,6 +2,7 @@ using System; using System.Collections.Generic; using System.Linq; using System.Net.WebSockets; +using System.Threading.Tasks; using App.Metrics; @@ -10,6 +11,8 @@ using Myriad.Gateway; using NodaTime; using NodaTime.Extensions; +using PluralKit.Core; + using Serilog; namespace PluralKit.Bot @@ -32,11 +35,16 @@ namespace PluralKit.Bot private readonly ILogger _logger; private readonly Cluster _client; private readonly Dictionary _shardInfo = new(); + + private readonly IDatabase _db; + private readonly ModelRepository _repo; - public ShardInfoService(ILogger logger, Cluster client, IMetrics metrics) + public ShardInfoService(ILogger logger, Cluster client, IMetrics metrics, IDatabase db, ModelRepository repo) { _client = client; _metrics = metrics; + _db = db; + _repo = repo; _logger = logger.ForContext(); } @@ -67,8 +75,8 @@ namespace PluralKit.Bot // Call our own SocketOpened listener manually (and then attach the listener properly) // Register listeners for new shards - shard.Resumed += () => Resumed(shard); - shard.Ready += () => Ready(shard); + shard.Resumed += () => ReadyOrResumed(shard); + shard.Ready += () => ReadyOrResumed(shard); shard.SocketClosed += (closeStatus, message) => SocketClosed(shard, closeStatus, message); shard.HeartbeatReceived += latency => Heartbeated(shard, latency); @@ -86,20 +94,18 @@ namespace PluralKit.Bot return info; } - private void Resumed(Shard shard) - { - var info = TryGetShard(shard); - info.LastConnectionTime = SystemClock.Instance.GetCurrentInstant(); - info.Connected = true; - ReportShardStatus(); - } - - private void Ready(Shard shard) + private void ReadyOrResumed(Shard shard) { var info = TryGetShard(shard); info.LastConnectionTime = SystemClock.Instance.GetCurrentInstant(); info.Connected = true; ReportShardStatus(); + + _ = ExecuteWithDatabase(async c => + { + await _repo.SetShardStatus(c, shard.ShardId, PKShardInfo.ShardStatus.Up); + await _repo.RegisterShardConnection(c, shard.ShardId); + }); } private void SocketClosed(Shard shard, WebSocketCloseStatus? closeStatus, string message) @@ -108,6 +114,9 @@ namespace PluralKit.Bot info.DisconnectionCount++; info.Connected = false; ReportShardStatus(); + + _ = ExecuteWithDatabase(c => + _repo.SetShardStatus(c, shard.ShardId, PKShardInfo.ShardStatus.Down)); } private void Heartbeated(Shard shard, TimeSpan latency) @@ -116,6 +125,23 @@ namespace PluralKit.Bot info.LastHeartbeatTime = SystemClock.Instance.GetCurrentInstant(); info.Connected = true; info.ShardLatency = latency.ToDuration(); + + _ = ExecuteWithDatabase(c => + _repo.RegisterShardHeartbeat(c, shard.ShardId, latency.ToDuration())); + } + + private async Task ExecuteWithDatabase(Func fn) + { + // wrapper function to log errors because we "async void" it at call site :( + try + { + await using var conn = await _db.Obtain(); + await fn(conn); + } + catch (Exception e) + { + _logger.Error(e, "Error persisting shard status"); + } } public ShardInfo GetShardInfo(Shard shard) => _shardInfo[shard.ShardId]; diff --git a/PluralKit.Core/Database/Database.cs b/PluralKit.Core/Database/Database.cs index 043b9e68..14eee429 100644 --- a/PluralKit.Core/Database/Database.cs +++ b/PluralKit.Core/Database/Database.cs @@ -19,7 +19,7 @@ namespace PluralKit.Core internal class Database: IDatabase { private const string RootPath = "PluralKit.Core.Database"; // "resource path" root for SQL files - private const int TargetSchemaVersion = 13; + private const int TargetSchemaVersion = 14; private readonly CoreConfig _config; private readonly ILogger _logger; diff --git a/PluralKit.Core/Database/Migrations/14.sql b/PluralKit.Core/Database/Migrations/14.sql new file mode 100644 index 00000000..ad7c8c26 --- /dev/null +++ b/PluralKit.Core/Database/Migrations/14.sql @@ -0,0 +1,15 @@ +-- SCHEMA VERSION 14: 2021-06-10 -- +-- Add shard status table -- + +create table shards ( + id int not null primary key, + + -- 0 = down, 1 = up + status smallint not null default 0, + + ping float, + last_heartbeat timestamptz, + last_connection timestamptz +); + +update info set schema_version = 14; \ No newline at end of file diff --git a/PluralKit.Core/Database/Repository/ModelRepository.Shards.cs b/PluralKit.Core/Database/Repository/ModelRepository.Shards.cs new file mode 100644 index 00000000..758d8d9c --- /dev/null +++ b/PluralKit.Core/Database/Repository/ModelRepository.Shards.cs @@ -0,0 +1,30 @@ +using System.Collections.Generic; +using System.Threading.Tasks; + +using Dapper; + +using NodaTime; + +namespace PluralKit.Core +{ + public partial class ModelRepository + { + public Task> GetShards(IPKConnection conn) => + conn.QueryAsync("select * from shards"); + + public Task SetShardStatus(IPKConnection conn, int shard, PKShardInfo.ShardStatus status) => + conn.ExecuteAsync( + "insert into shards (id, status) values (@Id, @Status) on conflict (id) do update set status = @Status", + new {Id = shard, Status = status}); + + public Task RegisterShardHeartbeat(IPKConnection conn, int shard, Duration ping) => + conn.ExecuteAsync( + "insert into shards (id, last_heartbeat, ping) values (@Id, now(), @Ping) on conflict (id) do update set last_heartbeat = now(), ping = @Ping", + new {Id = shard, Ping = ping.TotalSeconds}); + + public Task RegisterShardConnection(IPKConnection conn, int shard) => + conn.ExecuteAsync( + "insert into shards (id, last_connection) values (@Id, now()) on conflict (id) do update set last_connection = now()", + new {Id = shard}); + } +} \ No newline at end of file diff --git a/PluralKit.Core/Models/PKShardInfo.cs b/PluralKit.Core/Models/PKShardInfo.cs new file mode 100644 index 00000000..fc456e76 --- /dev/null +++ b/PluralKit.Core/Models/PKShardInfo.cs @@ -0,0 +1,19 @@ +using NodaTime; + +namespace PluralKit.Core +{ + public class PKShardInfo + { + public int Id { get; } + public ShardStatus Status { get; } + public float? Ping { get; } + public Instant? LastHeartbeat { get; } + public Instant? LastConnection { get; } + + public enum ShardStatus + { + Down = 0, + Up = 1 + } + } +} \ No newline at end of file