Respect shard concurrency limit
This commit is contained in:
		| @@ -2,6 +2,7 @@ | ||||
| using System.Collections.Concurrent; | ||||
| using System.Collections.Generic; | ||||
| using System.Linq; | ||||
| using System.Threading; | ||||
| using System.Threading.Tasks; | ||||
|  | ||||
| using Myriad.Types; | ||||
| @@ -46,33 +47,55 @@ namespace Myriad.Gateway | ||||
|         public async Task Start(GatewayInfo.Bot info, ClusterSessionState? lastState = null) | ||||
|         { | ||||
|             if (lastState != null && lastState.Shards.Count == info.Shards) | ||||
|                 await Resume(info.Url, lastState); | ||||
|                 await Resume(info.Url, lastState, info.SessionStartLimit.MaxConcurrency); | ||||
|             else | ||||
|                 await Start(info.Url, info.Shards); | ||||
|                 await Start(info.Url, info.Shards, info.SessionStartLimit.MaxConcurrency); | ||||
|         } | ||||
|  | ||||
|         public async Task Resume(string url, ClusterSessionState sessionState) | ||||
|         public async Task Resume(string url, ClusterSessionState sessionState, int concurrency) | ||||
|         { | ||||
|             _logger.Information("Resuming session with {ShardCount} shards at {Url}", sessionState.Shards.Count, url); | ||||
|             foreach (var shardState in sessionState.Shards) | ||||
|                 CreateAndAddShard(url, shardState.Shard, shardState.Session); | ||||
|  | ||||
|             await StartShards(); | ||||
|             await StartShards(concurrency); | ||||
|         } | ||||
|  | ||||
|         public async Task Start(string url, int shardCount) | ||||
|         public async Task Start(string url, int shardCount, int concurrency) | ||||
|         { | ||||
|             _logger.Information("Starting {ShardCount} shards at {Url}", shardCount, url); | ||||
|             for (var i = 0; i < shardCount; i++) | ||||
|                 CreateAndAddShard(url, new ShardInfo(i, shardCount), null); | ||||
|  | ||||
|             await StartShards(); | ||||
|             await StartShards(concurrency); | ||||
|         } | ||||
|  | ||||
|         private async Task StartShards() | ||||
|         private async Task StartShards(int concurrency) | ||||
|         { | ||||
|             var lastTime = DateTimeOffset.UtcNow; | ||||
|             var identifyCalls = 0; | ||||
|              | ||||
|             _logger.Information("Connecting shards..."); | ||||
|             await Task.WhenAll(_shards.Values.Select(s => s.Start())); | ||||
|             foreach (var shard in _shards.Values) | ||||
|             { | ||||
|                 if (identifyCalls >= concurrency) | ||||
|                 { | ||||
|                     var timeout = lastTime + TimeSpan.FromSeconds(5.5); | ||||
|                     var delay = timeout - DateTimeOffset.UtcNow; | ||||
|  | ||||
|                     if (delay > TimeSpan.Zero) | ||||
|                     { | ||||
|                         _logger.Information("Hit shard concurrency limit, waiting {Delay}", delay); | ||||
|                         await Task.Delay(delay); | ||||
|                     } | ||||
|  | ||||
|                     identifyCalls = 0; | ||||
|                     lastTime = DateTimeOffset.UtcNow; | ||||
|                 } | ||||
|  | ||||
|                 await shard.Start(); | ||||
|                 identifyCalls++; | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         private void CreateAndAddShard(string url, ShardInfo shardInfo, ShardSessionInfo? session) | ||||
|   | ||||
| @@ -5,5 +5,6 @@ | ||||
|         public int Total { get; init; } | ||||
|         public int Remaining { get; init; } | ||||
|         public int ResetAfter { get; init; } | ||||
|         public int MaxConcurrency { get; init; } | ||||
|     } | ||||
| } | ||||
		Reference in New Issue
	
	Block a user