From 6ea1309ae0ce1406a6e3d7de9b0607a83bf3efc1 Mon Sep 17 00:00:00 2001 From: Ske Date: Wed, 9 Jun 2021 12:24:55 +0200 Subject: [PATCH] Attempt at a more resilient shard handler Signed-off-by: Ske --- Myriad/Gateway/Shard.cs | 22 ++++++++++++++---- Myriad/Gateway/ShardConnection.cs | 38 +++++++++++++++++++++++-------- 2 files changed, 46 insertions(+), 14 deletions(-) diff --git a/Myriad/Gateway/Shard.cs b/Myriad/Gateway/Shard.cs index b0f91158..f90b0741 100644 --- a/Myriad/Gateway/Shard.cs +++ b/Myriad/Gateway/Shard.cs @@ -74,6 +74,7 @@ namespace Myriad.Gateway try { await ConnectInner(); + await HandleConnectionOpened(); while (_conn.State == WebSocketState.Open) @@ -84,7 +85,7 @@ namespace Myriad.Gateway await _stateManager.HandlePacketReceived(packet); } - + await HandleConnectionClosed(_conn.CloseStatus, _conn.CloseStatusDescription); _logger.Information("Shard {ShardId}: Reconnecting after delay {ReconnectDelay}", @@ -92,6 +93,7 @@ namespace Myriad.Gateway if (_reconnectDelay > TimeSpan.Zero) await Task.Delay(_reconnectDelay); + _reconnectDelay = TimeSpan.Zero; } catch (Exception e) { @@ -121,10 +123,22 @@ namespace Myriad.Gateway private async Task ConnectInner() { - await _ratelimiter.Acquire(_info.ShardId); + while (true) + { + await _ratelimiter.Acquire(_info.ShardId); - _logger.Information("Shard {ShardId}: Connecting to WebSocket", _info.ShardId); - await _conn.Connect(_url, default); + _logger.Information("Shard {ShardId}: Connecting to WebSocket", _info.ShardId); + try + { + await _conn.Connect(_url, default); + break; + } + catch (WebSocketException e) + { + _logger.Error(e, "Shard {ShardId}: Error connecting to WebSocket, retrying in 5 seconds...", _info.ShardId); + await Task.Delay(TimeSpan.FromSeconds(5)); + } + } } private async Task DisconnectInner(WebSocketCloseStatus closeStatus) diff --git a/Myriad/Gateway/ShardConnection.cs b/Myriad/Gateway/ShardConnection.cs index 250ef84b..3e20615d 100644 --- a/Myriad/Gateway/ShardConnection.cs +++ b/Myriad/Gateway/ShardConnection.cs @@ -39,7 +39,8 @@ namespace Myriad.Gateway public async Task Send(GatewayPacket packet) { - if (_client == null || _client.State != WebSocketState.Open) + // from `ManagedWebSocket.s_validSendStates` + if (_client is not {State: WebSocketState.Open or WebSocketState.CloseReceived}) return; try @@ -60,9 +61,10 @@ namespace Myriad.Gateway public async Task Read() { - if (_client == null || _client.State != WebSocketState.Open) + // from `ManagedWebSocket.s_validReceiveStates` + if (_client is not {State: WebSocketState.Open or WebSocketState.CloseSent}) return null; - + try { var (_, packet) = await _serializer.ReadPacket(_client); @@ -71,6 +73,8 @@ namespace Myriad.Gateway catch (Exception e) { _logger.Error(e, "Error reading from WebSocket"); + // force close so we can "reset" + await CloseInner(WebSocketCloseStatus.NormalClosure, null); } return null; @@ -85,19 +89,33 @@ namespace Myriad.Gateway { if (_client == null) return; + + var client = _client; + _client = null; + + // from `ManagedWebSocket.s_validCloseStates` + if (client.State is WebSocketState.Open or WebSocketState.CloseReceived or WebSocketState.CloseSent) + { + // Close with timeout, mostly to work around https://github.com/dotnet/runtime/issues/51590 + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2)); + try + { + await client.CloseAsync(closeStatus, description, cts.Token); + } + catch (Exception e) + { + _logger.Error(e, "Error closing WebSocket connection"); + } + } - if (_client.State != WebSocketState.Connecting && _client.State != WebSocketState.Open) - return; - - // Close with timeout, mostly to work around https://github.com/dotnet/runtime/issues/51590 - var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + // This shouldn't need to be wrapped in a try/catch but doing it anyway :/ try { - await _client.CloseAsync(closeStatus, description, cts.Token); + client.Dispose(); } catch (Exception e) { - _logger.Error(e, "Error closing WebSocket connection"); + _logger.Error(e, "Error disposing WebSocket connection"); } } }