PluralKit/Myriad/Gateway/Shard.cs

215 lines
7.2 KiB
C#
Raw Permalink Normal View History

2020-12-22 12:15:26 +00:00
using System.Net.WebSockets;
using System.Text.Json;
2021-06-09 14:22:10 +00:00
using Myriad.Gateway.Limit;
2021-04-29 09:10:19 +00:00
using Myriad.Gateway.State;
2020-12-22 12:15:26 +00:00
using Myriad.Serialization;
using Myriad.Types;
using Serilog;
2021-06-10 12:21:05 +00:00
using Serilog.Context;
2020-12-22 12:15:26 +00:00
namespace Myriad.Gateway;
public class Shard
2020-12-22 12:15:26 +00:00
{
private const string LibraryName = "Myriad (for PluralKit)";
private readonly GatewaySettings _settings;
private readonly ShardInfo _info;
private readonly IGatewayRatelimiter _ratelimiter;
private readonly string _url;
private readonly ILogger _logger;
private readonly ShardStateManager _stateManager;
private readonly JsonSerializerOptions _jsonSerializerOptions;
private readonly ShardConnection _conn;
public int ShardId => _info.ShardId;
public ShardState State => _stateManager.State;
public TimeSpan? Latency => _stateManager.Latency;
public User? User => _stateManager.User;
public ApplicationPartial? Application => _stateManager.Application;
// TODO: I wanna get rid of these or move them at some point
public event Func<IGatewayEvent, Task>? OnEventReceived;
public event Action<TimeSpan>? HeartbeatReceived;
public event Action? SocketOpened;
public event Action? Resumed;
public event Action? Ready;
public event Action<WebSocketCloseStatus?, string?>? SocketClosed;
private TimeSpan _reconnectDelay = TimeSpan.Zero;
private Task? _worker;
private GatewayStatusUpdate? _presence { get; init; }
public Shard(GatewaySettings settings, ShardInfo info, IGatewayRatelimiter ratelimiter, string url, ILogger logger, GatewayStatusUpdate? presence = null)
2020-12-22 12:15:26 +00:00
{
_jsonSerializerOptions = new JsonSerializerOptions().ConfigureForMyriad();
_settings = settings;
_info = info;
_presence = presence;
_ratelimiter = ratelimiter;
_url = url;
_logger = logger.ForContext<Shard>().ForContext("ShardId", info.ShardId);
_stateManager = new ShardStateManager(info, _jsonSerializerOptions, logger)
2020-12-22 12:15:26 +00:00
{
HandleEvent = HandleEvent,
SendHeartbeat = SendHeartbeat,
SendIdentify = SendIdentify,
SendResume = SendResume,
Connect = ConnectInner,
Reconnect = Reconnect,
};
_stateManager.OnHeartbeatReceived += latency =>
{
HeartbeatReceived?.Invoke(latency);
};
2020-12-22 12:15:26 +00:00
_conn = new ShardConnection(_jsonSerializerOptions, _logger, info.ShardId);
}
2021-08-27 15:03:47 +00:00
private async Task ShardLoop()
{
// may be superfluous but this adds shard id to ambient context which is nice
using var _ = LogContext.PushProperty("ShardId", _info.ShardId);
2021-08-27 15:03:47 +00:00
while (true)
{
try
2021-04-29 09:10:19 +00:00
{
await ConnectInner();
2021-08-27 15:03:47 +00:00
await HandleConnectionOpened();
2020-12-22 12:15:26 +00:00
while (_conn.State == WebSocketState.Open)
{
var packet = await _conn.Read();
if (packet == null)
break;
2020-12-22 12:15:26 +00:00
await _stateManager.HandlePacketReceived(packet);
}
2021-08-27 15:03:47 +00:00
await HandleConnectionClosed(_conn.CloseStatus, _conn.CloseStatusDescription);
2020-12-22 12:15:26 +00:00
_logger.Information("Shard {ShardId}: Reconnecting after delay {ReconnectDelay}",
_info.ShardId, _reconnectDelay);
2021-04-29 09:10:19 +00:00
if (_reconnectDelay > TimeSpan.Zero)
await Task.Delay(_reconnectDelay);
_reconnectDelay = TimeSpan.Zero;
}
catch (Exception e)
{
_logger.Error(e, "Shard {ShardId}: Error in main shard loop, reconnecting in 5 seconds...", _info.ShardId);
2021-08-27 15:03:47 +00:00
// todo: exponential backoff here? this should never happen, ideally...
await Task.Delay(TimeSpan.FromSeconds(5));
2021-04-29 09:10:19 +00:00
}
2020-12-22 12:15:26 +00:00
}
}
2021-08-27 15:03:47 +00:00
public async Task Start()
{
if (_worker == null)
_worker = ShardLoop();
2021-06-09 14:22:10 +00:00
// Ideally we'd stagger the startups so we don't smash the websocket but that's difficult with the
// identify rate limiter so this is the best we can do rn, maybe?
await Task.Delay(200);
}
2020-12-22 12:15:26 +00:00
public async Task UpdateStatus(GatewayStatusUpdate payload)
=> await _conn.Send(new GatewayPacket
2020-12-22 12:15:26 +00:00
{
Opcode = GatewayOpcode.PresenceUpdate,
Payload = payload
});
2021-08-27 15:03:47 +00:00
private async Task ConnectInner()
{
while (true)
2020-12-22 12:15:26 +00:00
{
await _ratelimiter.Identify(_info.ShardId);
_logger.Information("Shard {ShardId}: Connecting to WebSocket", _info.ShardId);
try
{
await _conn.Connect(_url, default);
break;
}
catch (WebSocketException e)
{
_logger.Error(e, "Shard {ShardId}: Error connecting to WebSocket, retrying in 5 seconds...", _info.ShardId);
await Task.Delay(TimeSpan.FromSeconds(5));
}
2020-12-22 12:15:26 +00:00
}
}
2021-08-27 15:03:47 +00:00
private Task DisconnectInner(WebSocketCloseStatus closeStatus)
=> _conn.Disconnect(closeStatus, null);
2021-08-27 15:03:47 +00:00
private async Task SendIdentify()
=> await _conn.Send(new GatewayPacket
2020-12-22 12:15:26 +00:00
{
Opcode = GatewayOpcode.Identify,
Payload = new GatewayIdentify
2020-12-22 12:15:26 +00:00
{
Compress = false,
Intents = _settings.Intents,
Properties = new GatewayIdentify.ConnectionProperties
2020-12-22 12:15:26 +00:00
{
Browser = LibraryName,
Device = LibraryName,
Os = Environment.OSVersion.ToString()
},
Shard = _info,
Token = _settings.Token,
LargeThreshold = 50,
Presence = _presence,
}
});
2021-08-27 15:03:47 +00:00
private async Task SendResume((string SessionId, int? LastSeq) arg)
=> await _conn.Send(new GatewayPacket
2020-12-22 12:15:26 +00:00
{
Opcode = GatewayOpcode.Resume,
Payload = new GatewayResume(_settings.Token, arg.SessionId, arg.LastSeq ?? 0)
});
2021-08-27 15:03:47 +00:00
private async Task SendHeartbeat(int? lastSeq)
=> await _conn.Send(new GatewayPacket { Opcode = GatewayOpcode.Heartbeat, Payload = lastSeq });
2021-08-27 15:03:47 +00:00
private async Task Reconnect(WebSocketCloseStatus closeStatus, TimeSpan delay)
{
_reconnectDelay = delay;
await DisconnectInner(closeStatus);
}
2020-12-22 12:15:26 +00:00
private async Task HandleEvent(IGatewayEvent arg)
{
if (arg is ReadyEvent)
Ready?.Invoke();
if (arg is ResumedEvent)
Resumed?.Invoke();
2021-08-27 15:03:47 +00:00
await (OnEventReceived?.Invoke(arg) ?? Task.CompletedTask);
}
2020-12-22 12:15:26 +00:00
private async Task HandleConnectionOpened()
{
_logger.Information("Shard {ShardId}: Connection opened", _info.ShardId);
await _stateManager.HandleConnectionOpened();
SocketOpened?.Invoke();
}
2020-12-22 12:15:26 +00:00
private async Task HandleConnectionClosed(WebSocketCloseStatus? closeStatus, string? description)
{
_logger.Information("Shard {ShardId}: Connection closed ({CloseStatus}/{Description})",
_info.ShardId, closeStatus, description ?? "<null>");
await _stateManager.HandleConnectionClosed();
SocketClosed?.Invoke(closeStatus, description);
2020-12-22 12:15:26 +00:00
}
}