PluralKit/Myriad/Gateway/Shard.cs

216 lines
7.5 KiB
C#
Raw Normal View History

2020-12-22 12:15:26 +00:00
using System;
using System.Net.WebSockets;
using System.Text.Json;
using System.Threading.Tasks;
2021-04-29 09:10:19 +00:00
using Myriad.Gateway.State;
2020-12-22 12:15:26 +00:00
using Myriad.Serialization;
using Myriad.Types;
using Serilog;
namespace Myriad.Gateway
{
2021-04-29 09:10:19 +00:00
public class Shard
2020-12-22 12:15:26 +00:00
{
2021-01-30 00:07:43 +00:00
private const string LibraryName = "Myriad (for PluralKit)";
2020-12-22 12:15:26 +00:00
2021-04-29 09:10:19 +00:00
private readonly GatewaySettings _settings;
private readonly ShardInfo _info;
private readonly ShardIdentifyRatelimiter _ratelimiter;
private readonly string _url;
2020-12-22 12:15:26 +00:00
private readonly ILogger _logger;
2021-04-29 09:10:19 +00:00
private readonly ShardStateManager _stateManager;
private readonly JsonSerializerOptions _jsonSerializerOptions;
private readonly ShardConnection _conn;
public int ShardId => _info.ShardId;
public ShardState State => _stateManager.State;
public TimeSpan? Latency => _stateManager.Latency;
public User? User => _stateManager.User;
public ApplicationPartial? Application => _stateManager.Application;
// TODO: I wanna get rid of these or move them at some point
public event Func<IGatewayEvent, Task>? OnEventReceived;
2021-01-30 00:07:43 +00:00
public event Action<TimeSpan>? HeartbeatReceived;
public event Action? SocketOpened;
public event Action? Resumed;
public event Action? Ready;
2021-04-29 09:10:19 +00:00
public event Action<WebSocketCloseStatus?, string?>? SocketClosed;
2020-12-22 12:15:26 +00:00
2021-04-29 09:10:19 +00:00
private TimeSpan _reconnectDelay = TimeSpan.Zero;
private Task? _worker;
2020-12-22 12:15:26 +00:00
2021-04-29 09:10:19 +00:00
public Shard(GatewaySettings settings, ShardInfo info, ShardIdentifyRatelimiter ratelimiter, string url, ILogger logger)
2020-12-22 12:15:26 +00:00
{
2021-04-29 09:10:19 +00:00
_jsonSerializerOptions = new JsonSerializerOptions().ConfigureForMyriad();
2020-12-22 12:15:26 +00:00
2021-04-29 09:10:19 +00:00
_settings = settings;
_info = info;
_ratelimiter = ratelimiter;
_url = url;
_logger = logger;
_stateManager = new ShardStateManager(info, _jsonSerializerOptions, logger)
{
HandleEvent = HandleEvent,
SendHeartbeat = SendHeartbeat,
SendIdentify = SendIdentify,
SendResume = SendResume,
Connect = ConnectInner,
Reconnect = Reconnect,
};
_stateManager.OnHeartbeatReceived += latency =>
{
HeartbeatReceived?.Invoke(latency);
};
2020-12-22 12:15:26 +00:00
2021-04-29 09:10:19 +00:00
_conn = new ShardConnection(_jsonSerializerOptions, _logger);
2020-12-22 12:15:26 +00:00
}
2021-04-29 09:10:19 +00:00
private async Task ShardLoop()
2020-12-22 12:15:26 +00:00
{
while (true)
2021-04-29 09:10:19 +00:00
{
2020-12-22 12:15:26 +00:00
try
{
2021-04-29 09:10:19 +00:00
await ConnectInner();
2021-04-29 09:10:19 +00:00
await HandleConnectionOpened();
2020-12-22 12:15:26 +00:00
2021-04-29 09:10:19 +00:00
while (_conn.State == WebSocketState.Open)
{
var packet = await _conn.Read();
if (packet == null)
break;
2020-12-22 12:15:26 +00:00
2021-04-29 09:10:19 +00:00
await _stateManager.HandlePacketReceived(packet);
}
2021-04-29 09:10:19 +00:00
await HandleConnectionClosed(_conn.CloseStatus, _conn.CloseStatusDescription);
2020-12-22 12:15:26 +00:00
2021-04-29 09:10:19 +00:00
_logger.Information("Shard {ShardId}: Reconnecting after delay {ReconnectDelay}",
_info.ShardId, _reconnectDelay);
if (_reconnectDelay > TimeSpan.Zero)
await Task.Delay(_reconnectDelay);
_reconnectDelay = TimeSpan.Zero;
2020-12-22 12:15:26 +00:00
}
catch (Exception e)
{
2021-04-29 09:10:19 +00:00
_logger.Error(e, "Shard {ShardId}: Error in main shard loop, reconnecting in 5 seconds...", _info.ShardId);
// todo: exponential backoff here? this should never happen, ideally...
await Task.Delay(TimeSpan.FromSeconds(5));
2020-12-22 12:15:26 +00:00
}
2021-04-29 09:10:19 +00:00
}
2020-12-22 12:15:26 +00:00
}
2021-04-29 09:10:19 +00:00
public Task Start()
2020-12-22 12:15:26 +00:00
{
2021-04-29 09:10:19 +00:00
if (_worker == null)
_worker = ShardLoop();
return Task.CompletedTask;
2020-12-22 12:15:26 +00:00
}
2021-04-29 09:10:19 +00:00
public async Task UpdateStatus(GatewayStatusUpdate payload)
2020-12-22 12:15:26 +00:00
{
2021-04-29 09:10:19 +00:00
await _conn.Send(new GatewayPacket
2020-12-22 12:15:26 +00:00
{
2021-04-29 09:10:19 +00:00
Opcode = GatewayOpcode.PresenceUpdate,
Payload = payload
});
2020-12-22 12:15:26 +00:00
}
2021-04-29 09:10:19 +00:00
private async Task ConnectInner()
2020-12-22 12:15:26 +00:00
{
while (true)
{
await _ratelimiter.Acquire(_info.ShardId);
2020-12-22 12:15:26 +00:00
_logger.Information("Shard {ShardId}: Connecting to WebSocket", _info.ShardId);
try
{
await _conn.Connect(_url, default);
break;
}
catch (WebSocketException e)
{
_logger.Error(e, "Shard {ShardId}: Error connecting to WebSocket, retrying in 5 seconds...", _info.ShardId);
await Task.Delay(TimeSpan.FromSeconds(5));
}
}
2020-12-22 12:15:26 +00:00
}
2021-04-29 09:10:19 +00:00
private async Task DisconnectInner(WebSocketCloseStatus closeStatus)
2020-12-22 12:15:26 +00:00
{
2021-04-29 09:10:19 +00:00
await _conn.Disconnect(closeStatus, null);
2020-12-22 12:15:26 +00:00
}
2021-04-29 09:10:19 +00:00
private async Task SendIdentify()
2020-12-22 12:15:26 +00:00
{
2021-04-29 09:10:19 +00:00
await _conn.Send(new GatewayPacket
2020-12-22 12:15:26 +00:00
{
2021-04-29 09:10:19 +00:00
Opcode = GatewayOpcode.Identify,
Payload = new GatewayIdentify
2020-12-22 12:15:26 +00:00
{
2021-04-29 09:10:19 +00:00
Compress = false,
Intents = _settings.Intents,
Properties = new GatewayIdentify.ConnectionProperties
2020-12-22 12:15:26 +00:00
{
2021-04-29 09:10:19 +00:00
Browser = LibraryName,
Device = LibraryName,
Os = Environment.OSVersion.ToString()
},
Shard = _info,
Token = _settings.Token,
LargeThreshold = 50
2020-12-22 12:15:26 +00:00
}
2021-04-29 09:10:19 +00:00
});
2020-12-22 12:15:26 +00:00
}
2021-04-29 09:10:19 +00:00
private async Task SendResume((string SessionId, int? LastSeq) arg)
2020-12-22 12:15:26 +00:00
{
2021-04-29 09:10:19 +00:00
await _conn.Send(new GatewayPacket
2020-12-22 12:15:26 +00:00
{
2021-04-29 09:10:19 +00:00
Opcode = GatewayOpcode.Resume,
Payload = new GatewayResume(_settings.Token, arg.SessionId, arg.LastSeq ?? 0)
});
2020-12-22 12:15:26 +00:00
}
2021-04-29 09:10:19 +00:00
private async Task SendHeartbeat(int? lastSeq)
2020-12-22 12:15:26 +00:00
{
2021-04-29 09:10:19 +00:00
await _conn.Send(new GatewayPacket {Opcode = GatewayOpcode.Heartbeat, Payload = lastSeq});
2020-12-22 12:15:26 +00:00
}
2021-04-29 09:10:19 +00:00
private async Task Reconnect(WebSocketCloseStatus closeStatus, TimeSpan delay)
2020-12-22 12:15:26 +00:00
{
2021-04-29 09:10:19 +00:00
_reconnectDelay = delay;
await DisconnectInner(closeStatus);
2020-12-22 12:15:26 +00:00
}
2021-04-29 09:10:19 +00:00
private async Task HandleEvent(IGatewayEvent arg)
2020-12-22 12:15:26 +00:00
{
2021-04-29 09:10:19 +00:00
if (arg is ReadyEvent)
Ready?.Invoke();
if (arg is ResumedEvent)
Resumed?.Invoke();
await (OnEventReceived?.Invoke(arg) ?? Task.CompletedTask);
2020-12-22 12:15:26 +00:00
}
2021-04-29 09:10:19 +00:00
private async Task HandleConnectionOpened()
2020-12-22 12:15:26 +00:00
{
2021-04-29 09:10:19 +00:00
_logger.Information("Shard {ShardId}: Connection opened", _info.ShardId);
await _stateManager.HandleConnectionOpened();
SocketOpened?.Invoke();
2020-12-22 12:15:26 +00:00
}
2021-04-29 09:10:19 +00:00
private async Task HandleConnectionClosed(WebSocketCloseStatus? closeStatus, string? description)
2020-12-22 12:15:26 +00:00
{
2021-04-29 09:10:19 +00:00
_logger.Information("Shard {ShardId}: Connection closed ({CloseStatus}/{Description})",
_info.ShardId, closeStatus, description ?? "<null>");
await _stateManager.HandleConnectionClosed();
SocketClosed?.Invoke(closeStatus, description);
2020-12-22 12:15:26 +00:00
}
}
}