2021-01-30 00:07:43 +00:00
|
|
|
using System.Net.WebSockets;
|
2019-12-22 11:50:47 +00:00
|
|
|
|
2022-01-22 08:52:52 +00:00
|
|
|
using Google.Protobuf;
|
2020-06-14 20:19:12 +00:00
|
|
|
|
2021-01-15 10:29:43 +00:00
|
|
|
using Myriad.Gateway;
|
|
|
|
|
2019-12-22 11:50:47 +00:00
|
|
|
using NodaTime;
|
2022-01-22 08:52:52 +00:00
|
|
|
|
|
|
|
using StackExchange.Redis;
|
2020-05-01 15:30:12 +00:00
|
|
|
|
2021-06-10 10:52:47 +00:00
|
|
|
using PluralKit.Core;
|
|
|
|
|
2020-05-01 15:30:12 +00:00
|
|
|
using Serilog;
|
2019-12-22 11:50:47 +00:00
|
|
|
|
2021-11-27 02:10:56 +00:00
|
|
|
namespace PluralKit.Bot;
|
|
|
|
|
|
|
|
public class ShardInfoService
|
2019-12-22 11:50:47 +00:00
|
|
|
{
|
2022-02-07 16:05:55 +00:00
|
|
|
private readonly int? _clusterId;
|
2021-11-27 02:10:56 +00:00
|
|
|
private readonly ILogger _logger;
|
2022-01-22 08:52:52 +00:00
|
|
|
private readonly Cluster _client;
|
|
|
|
private readonly RedisService _redis;
|
2021-11-27 02:10:56 +00:00
|
|
|
private readonly Dictionary<int, ShardInfo> _shardInfo = new();
|
|
|
|
|
2022-02-07 16:05:55 +00:00
|
|
|
public ShardInfoService(ILogger logger, Cluster client, RedisService redis, BotConfig config)
|
2019-12-22 11:50:47 +00:00
|
|
|
{
|
2021-11-27 02:10:56 +00:00
|
|
|
_logger = logger.ForContext<ShardInfoService>();
|
2022-01-22 08:52:52 +00:00
|
|
|
_client = client;
|
|
|
|
_redis = redis;
|
2022-02-07 16:05:55 +00:00
|
|
|
_clusterId = config.Cluster?.NodeIndex;
|
2021-11-27 02:10:56 +00:00
|
|
|
}
|
2019-12-22 11:50:47 +00:00
|
|
|
|
2021-11-27 02:10:56 +00:00
|
|
|
public void Init()
|
|
|
|
{
|
|
|
|
// We initialize this before any shards are actually created and connected
|
|
|
|
// This means the client won't know the shard count, so we attach a listener every time a shard gets connected
|
|
|
|
_client.ShardCreated += InitializeShard;
|
|
|
|
}
|
2021-08-27 15:03:47 +00:00
|
|
|
|
2022-01-22 08:52:52 +00:00
|
|
|
public async Task<IEnumerable<ShardState>> GetShards()
|
2021-11-27 02:10:56 +00:00
|
|
|
{
|
2022-02-04 19:53:56 +00:00
|
|
|
if (_redis.Connection == null)
|
|
|
|
return new ShardState[] { };
|
2022-01-22 08:52:52 +00:00
|
|
|
var db = _redis.Connection.GetDatabase();
|
|
|
|
var redisInfo = await db.HashGetAllAsync("pluralkit:shardstatus");
|
|
|
|
return redisInfo.Select(x => Proto.Unmarshal<ShardState>(x.Value));
|
2021-11-27 02:10:56 +00:00
|
|
|
}
|
2020-05-01 15:30:12 +00:00
|
|
|
|
2021-11-27 02:10:56 +00:00
|
|
|
private void InitializeShard(Shard shard)
|
|
|
|
{
|
2022-01-22 08:52:52 +00:00
|
|
|
_ = Inner();
|
|
|
|
|
|
|
|
async Task Inner()
|
2019-12-22 11:50:47 +00:00
|
|
|
{
|
2022-01-22 08:52:52 +00:00
|
|
|
var db = _redis.Connection.GetDatabase();
|
|
|
|
var redisInfo = await db.HashGetAsync("pluralkit::shardstatus", shard.ShardId);
|
|
|
|
|
2021-11-27 02:10:56 +00:00
|
|
|
// Skip adding listeners if we've seen this shard & already added listeners to it
|
2022-01-22 08:52:52 +00:00
|
|
|
if (redisInfo.HasValue)
|
2021-11-27 02:10:56 +00:00
|
|
|
return;
|
2020-06-14 20:19:12 +00:00
|
|
|
|
2022-01-22 08:52:52 +00:00
|
|
|
// latency = 0 because otherwise shard 0 would serialize to an empty array, thanks protobuf
|
|
|
|
var state = new ShardState() { ShardId = shard.ShardId, Up = false, Latency = 1 };
|
2022-02-07 16:05:55 +00:00
|
|
|
if (_clusterId != null)
|
|
|
|
state.ClusterId = _clusterId.Value;
|
2019-12-22 11:50:47 +00:00
|
|
|
|
2022-01-22 08:52:52 +00:00
|
|
|
// Register listeners for new shard
|
|
|
|
shard.Resumed += () => ReadyOrResumed(shard);
|
|
|
|
shard.Ready += () => ReadyOrResumed(shard);
|
|
|
|
shard.SocketClosed += (closeStatus, message) => SocketClosed(shard, closeStatus, message);
|
|
|
|
shard.HeartbeatReceived += latency => Heartbeated(shard, latency);
|
2020-05-01 18:27:51 +00:00
|
|
|
|
2022-01-22 08:52:52 +00:00
|
|
|
// Register that we've seen it
|
|
|
|
await db.HashSetAsync("pluralkit:shardstatus", state.HashWrapper());
|
|
|
|
}
|
2021-11-27 02:10:56 +00:00
|
|
|
}
|
2019-12-22 11:50:47 +00:00
|
|
|
|
2022-01-22 08:52:52 +00:00
|
|
|
private async Task<ShardState?> TryGetShard(Shard shard)
|
2021-11-27 02:10:56 +00:00
|
|
|
{
|
2022-01-22 08:52:52 +00:00
|
|
|
var db = _redis.Connection.GetDatabase();
|
|
|
|
var redisInfo = await db.HashGetAsync("pluralkit:shardstatus", shard.ShardId);
|
|
|
|
if (redisInfo.HasValue)
|
|
|
|
return Proto.Unmarshal<ShardState>(redisInfo);
|
|
|
|
return null;
|
2021-11-27 02:10:56 +00:00
|
|
|
}
|
2021-08-27 15:03:47 +00:00
|
|
|
|
2021-11-27 02:10:56 +00:00
|
|
|
private void ReadyOrResumed(Shard shard)
|
|
|
|
{
|
2022-01-22 08:52:52 +00:00
|
|
|
_ = DoAsync(async () =>
|
2020-05-01 15:30:12 +00:00
|
|
|
{
|
2022-01-22 08:52:52 +00:00
|
|
|
var info = await TryGetShard(shard);
|
|
|
|
|
|
|
|
info.LastConnection = (int)SystemClock.Instance.GetCurrentInstant().ToUnixTimeSeconds();
|
|
|
|
info.Up = true;
|
|
|
|
|
|
|
|
var db = _redis.Connection.GetDatabase();
|
|
|
|
await db.HashSetAsync("pluralkit:shardstatus", info.HashWrapper());
|
2021-11-27 02:10:56 +00:00
|
|
|
});
|
|
|
|
}
|
2021-08-27 15:03:47 +00:00
|
|
|
|
2021-11-27 02:10:56 +00:00
|
|
|
private void SocketClosed(Shard shard, WebSocketCloseStatus? closeStatus, string message)
|
|
|
|
{
|
2022-01-22 08:52:52 +00:00
|
|
|
_ = DoAsync(async () =>
|
|
|
|
{
|
|
|
|
var info = await TryGetShard(shard);
|
2021-11-27 02:10:56 +00:00
|
|
|
|
2022-01-22 08:52:52 +00:00
|
|
|
info.DisconnectionCount++;
|
|
|
|
info.Up = false;
|
|
|
|
|
|
|
|
var db = _redis.Connection.GetDatabase();
|
|
|
|
await db.HashSetAsync("pluralkit:shardstatus", info.HashWrapper());
|
|
|
|
});
|
2021-11-27 02:10:56 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
private void Heartbeated(Shard shard, TimeSpan latency)
|
|
|
|
{
|
2022-01-22 08:52:52 +00:00
|
|
|
_ = DoAsync(async () =>
|
|
|
|
{
|
|
|
|
var info = await TryGetShard(shard);
|
2021-11-27 02:10:56 +00:00
|
|
|
|
2022-01-22 08:52:52 +00:00
|
|
|
info.LastHeartbeat = (int)SystemClock.Instance.GetCurrentInstant().ToUnixTimeSeconds();
|
|
|
|
info.Up = true;
|
|
|
|
info.Latency = (int)latency.TotalMilliseconds;
|
|
|
|
|
|
|
|
var db = _redis.Connection.GetDatabase();
|
|
|
|
await db.HashSetAsync("pluralkit:shardstatus", info.HashWrapper());
|
|
|
|
});
|
2021-11-27 02:10:56 +00:00
|
|
|
}
|
2021-06-10 10:52:47 +00:00
|
|
|
|
2022-01-22 08:52:52 +00:00
|
|
|
private async Task DoAsync(Func<Task> fn)
|
2021-11-27 02:10:56 +00:00
|
|
|
{
|
|
|
|
// wrapper function to log errors because we "async void" it at call site :(
|
|
|
|
try
|
|
|
|
{
|
2022-01-22 08:52:52 +00:00
|
|
|
await fn();
|
2021-11-27 02:10:56 +00:00
|
|
|
}
|
|
|
|
catch (Exception e)
|
2021-06-10 10:52:47 +00:00
|
|
|
{
|
2021-11-27 02:10:56 +00:00
|
|
|
_logger.Error(e, "Error persisting shard status");
|
2019-12-22 11:50:47 +00:00
|
|
|
}
|
2021-11-27 02:10:56 +00:00
|
|
|
}
|
2022-01-22 08:52:52 +00:00
|
|
|
}
|
2020-05-01 15:30:12 +00:00
|
|
|
|
2022-01-22 08:52:52 +00:00
|
|
|
public static class RedisExt
|
|
|
|
{
|
|
|
|
// convenience method
|
|
|
|
public static HashEntry[] HashWrapper(this ShardState state)
|
|
|
|
=> new[] { new HashEntry(state.ShardId, state.ToByteArray()) };
|
2019-12-22 11:50:47 +00:00
|
|
|
}
|