From e7ae9dbe44108ae7964669b1d1744f63509c7b92 Mon Sep 17 00:00:00 2001 From: Ske Date: Mon, 1 Feb 2021 14:26:39 +0100 Subject: [PATCH] Respect shard concurrency limit --- Myriad/Gateway/Cluster.cs | 39 ++++++++++++++++++----- Myriad/Types/Gateway/SessionStartLimit.cs | 1 + 2 files changed, 32 insertions(+), 8 deletions(-) diff --git a/Myriad/Gateway/Cluster.cs b/Myriad/Gateway/Cluster.cs index 220eadc1..cbb0bd51 100644 --- a/Myriad/Gateway/Cluster.cs +++ b/Myriad/Gateway/Cluster.cs @@ -2,6 +2,7 @@ using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; +using System.Threading; using System.Threading.Tasks; using Myriad.Types; @@ -46,33 +47,55 @@ namespace Myriad.Gateway public async Task Start(GatewayInfo.Bot info, ClusterSessionState? lastState = null) { if (lastState != null && lastState.Shards.Count == info.Shards) - await Resume(info.Url, lastState); + await Resume(info.Url, lastState, info.SessionStartLimit.MaxConcurrency); else - await Start(info.Url, info.Shards); + await Start(info.Url, info.Shards, info.SessionStartLimit.MaxConcurrency); } - public async Task Resume(string url, ClusterSessionState sessionState) + public async Task Resume(string url, ClusterSessionState sessionState, int concurrency) { _logger.Information("Resuming session with {ShardCount} shards at {Url}", sessionState.Shards.Count, url); foreach (var shardState in sessionState.Shards) CreateAndAddShard(url, shardState.Shard, shardState.Session); - await StartShards(); + await StartShards(concurrency); } - public async Task Start(string url, int shardCount) + public async Task Start(string url, int shardCount, int concurrency) { _logger.Information("Starting {ShardCount} shards at {Url}", shardCount, url); for (var i = 0; i < shardCount; i++) CreateAndAddShard(url, new ShardInfo(i, shardCount), null); - await StartShards(); + await StartShards(concurrency); } - private async Task StartShards() + private async Task StartShards(int concurrency) { + var lastTime = DateTimeOffset.UtcNow; + var identifyCalls = 0; + _logger.Information("Connecting shards..."); - await Task.WhenAll(_shards.Values.Select(s => s.Start())); + foreach (var shard in _shards.Values) + { + if (identifyCalls >= concurrency) + { + var timeout = lastTime + TimeSpan.FromSeconds(5.5); + var delay = timeout - DateTimeOffset.UtcNow; + + if (delay > TimeSpan.Zero) + { + _logger.Information("Hit shard concurrency limit, waiting {Delay}", delay); + await Task.Delay(delay); + } + + identifyCalls = 0; + lastTime = DateTimeOffset.UtcNow; + } + + await shard.Start(); + identifyCalls++; + } } private void CreateAndAddShard(string url, ShardInfo shardInfo, ShardSessionInfo? session) diff --git a/Myriad/Types/Gateway/SessionStartLimit.cs b/Myriad/Types/Gateway/SessionStartLimit.cs index 381c7cd9..b5da4770 100644 --- a/Myriad/Types/Gateway/SessionStartLimit.cs +++ b/Myriad/Types/Gateway/SessionStartLimit.cs @@ -5,5 +5,6 @@ public int Total { get; init; } public int Remaining { get; init; } public int ResetAfter { get; init; } + public int MaxConcurrency { get; init; } } } \ No newline at end of file