Attempt at a more resilient shard handler

Signed-off-by: Ske <voltasalt@gmail.com>
This commit is contained in:
Ske 2021-06-09 12:24:55 +02:00
parent 8b948bcfbb
commit 6ea1309ae0
2 changed files with 46 additions and 14 deletions

View File

@ -74,6 +74,7 @@ namespace Myriad.Gateway
try try
{ {
await ConnectInner(); await ConnectInner();
await HandleConnectionOpened(); await HandleConnectionOpened();
while (_conn.State == WebSocketState.Open) while (_conn.State == WebSocketState.Open)
@ -92,6 +93,7 @@ namespace Myriad.Gateway
if (_reconnectDelay > TimeSpan.Zero) if (_reconnectDelay > TimeSpan.Zero)
await Task.Delay(_reconnectDelay); await Task.Delay(_reconnectDelay);
_reconnectDelay = TimeSpan.Zero;
} }
catch (Exception e) catch (Exception e)
{ {
@ -121,10 +123,22 @@ namespace Myriad.Gateway
private async Task ConnectInner() private async Task ConnectInner()
{ {
await _ratelimiter.Acquire(_info.ShardId); while (true)
{
await _ratelimiter.Acquire(_info.ShardId);
_logger.Information("Shard {ShardId}: Connecting to WebSocket", _info.ShardId); _logger.Information("Shard {ShardId}: Connecting to WebSocket", _info.ShardId);
await _conn.Connect(_url, default); try
{
await _conn.Connect(_url, default);
break;
}
catch (WebSocketException e)
{
_logger.Error(e, "Shard {ShardId}: Error connecting to WebSocket, retrying in 5 seconds...", _info.ShardId);
await Task.Delay(TimeSpan.FromSeconds(5));
}
}
} }
private async Task DisconnectInner(WebSocketCloseStatus closeStatus) private async Task DisconnectInner(WebSocketCloseStatus closeStatus)

View File

@ -39,7 +39,8 @@ namespace Myriad.Gateway
public async Task Send(GatewayPacket packet) public async Task Send(GatewayPacket packet)
{ {
if (_client == null || _client.State != WebSocketState.Open) // from `ManagedWebSocket.s_validSendStates`
if (_client is not {State: WebSocketState.Open or WebSocketState.CloseReceived})
return; return;
try try
@ -60,7 +61,8 @@ namespace Myriad.Gateway
public async Task<GatewayPacket?> Read() public async Task<GatewayPacket?> Read()
{ {
if (_client == null || _client.State != WebSocketState.Open) // from `ManagedWebSocket.s_validReceiveStates`
if (_client is not {State: WebSocketState.Open or WebSocketState.CloseSent})
return null; return null;
try try
@ -71,6 +73,8 @@ namespace Myriad.Gateway
catch (Exception e) catch (Exception e)
{ {
_logger.Error(e, "Error reading from WebSocket"); _logger.Error(e, "Error reading from WebSocket");
// force close so we can "reset"
await CloseInner(WebSocketCloseStatus.NormalClosure, null);
} }
return null; return null;
@ -86,18 +90,32 @@ namespace Myriad.Gateway
if (_client == null) if (_client == null)
return; return;
if (_client.State != WebSocketState.Connecting && _client.State != WebSocketState.Open) var client = _client;
return; _client = null;
// Close with timeout, mostly to work around https://github.com/dotnet/runtime/issues/51590 // from `ManagedWebSocket.s_validCloseStates`
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); if (client.State is WebSocketState.Open or WebSocketState.CloseReceived or WebSocketState.CloseSent)
{
// Close with timeout, mostly to work around https://github.com/dotnet/runtime/issues/51590
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2));
try
{
await client.CloseAsync(closeStatus, description, cts.Token);
}
catch (Exception e)
{
_logger.Error(e, "Error closing WebSocket connection");
}
}
// This shouldn't need to be wrapped in a try/catch but doing it anyway :/
try try
{ {
await _client.CloseAsync(closeStatus, description, cts.Token); client.Dispose();
} }
catch (Exception e) catch (Exception e)
{ {
_logger.Error(e, "Error closing WebSocket connection"); _logger.Error(e, "Error disposing WebSocket connection");
} }
} }
} }