diff --git a/Myriad/Gateway/Cluster.cs b/Myriad/Gateway/Cluster.cs index 07695a5e..d7e0eadb 100644 --- a/Myriad/Gateway/Cluster.cs +++ b/Myriad/Gateway/Cluster.cs @@ -4,6 +4,7 @@ using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; +using Myriad.Gateway.Limit; using Myriad.Types; using Serilog; @@ -15,7 +16,7 @@ namespace Myriad.Gateway private readonly GatewaySettings _gatewaySettings; private readonly ILogger _logger; private readonly ConcurrentDictionary _shards = new(); - private ShardIdentifyRatelimiter? _ratelimiter; + private IGatewayRatelimiter? _ratelimiter; public Cluster(GatewaySettings gatewaySettings, ILogger logger) { @@ -35,11 +36,10 @@ namespace Myriad.Gateway await Start(info.Url, 0, info.Shards - 1, info.Shards, info.SessionStartLimit.MaxConcurrency); } - public async Task Start(string url, int shardMin, int shardMax, int shardTotal, int concurrency) + public async Task Start(string url, int shardMin, int shardMax, int shardTotal, int recommendedConcurrency) { - concurrency = GetActualShardConcurrency(concurrency); - _ratelimiter = new(_logger, concurrency); - + _ratelimiter = GetRateLimiter(recommendedConcurrency); + var shardCount = shardMax - shardMin + 1; _logger.Information("Starting {ShardCount} of {ShardTotal} shards (#{ShardMin}-#{ShardMax}) at {Url}", shardCount, shardTotal, shardMin, shardMax, url); @@ -77,5 +77,16 @@ namespace Myriad.Gateway return Math.Min(_gatewaySettings.MaxShardConcurrency.Value, recommendedConcurrency); } + + private IGatewayRatelimiter GetRateLimiter(int recommendedConcurrency) + { + if (_gatewaySettings.GatewayQueueUrl != null) + { + return new TwilightGatewayRatelimiter(_logger, _gatewaySettings.GatewayQueueUrl); + } + + var concurrency = GetActualShardConcurrency(recommendedConcurrency); + return new LocalGatewayRatelimiter(_logger, concurrency); + } } } \ No newline at end of file diff --git a/Myriad/Gateway/GatewaySettings.cs b/Myriad/Gateway/GatewaySettings.cs index 1fdb12fd..a0e6a59e 100644 --- a/Myriad/Gateway/GatewaySettings.cs +++ b/Myriad/Gateway/GatewaySettings.cs @@ -5,5 +5,6 @@ public string Token { get; init; } public GatewayIntent Intents { get; init; } public int? MaxShardConcurrency { get; init; } + public string? GatewayQueueUrl { get; init; } } } \ No newline at end of file diff --git a/Myriad/Gateway/Limit/IGatewayRatelimiter.cs b/Myriad/Gateway/Limit/IGatewayRatelimiter.cs new file mode 100644 index 00000000..b5552e25 --- /dev/null +++ b/Myriad/Gateway/Limit/IGatewayRatelimiter.cs @@ -0,0 +1,9 @@ +using System.Threading.Tasks; + +namespace Myriad.Gateway.Limit +{ + public interface IGatewayRatelimiter + { + public Task Identify(int shard); + } +} \ No newline at end of file diff --git a/Myriad/Gateway/ShardIdentifyRatelimiter.cs b/Myriad/Gateway/Limit/LocalGatewayRatelimiter.cs similarity index 87% rename from Myriad/Gateway/ShardIdentifyRatelimiter.cs rename to Myriad/Gateway/Limit/LocalGatewayRatelimiter.cs index 619eccfd..995f7288 100644 --- a/Myriad/Gateway/ShardIdentifyRatelimiter.cs +++ b/Myriad/Gateway/Limit/LocalGatewayRatelimiter.cs @@ -4,9 +4,9 @@ using System.Threading.Tasks; using Serilog; -namespace Myriad.Gateway +namespace Myriad.Gateway.Limit { - public class ShardIdentifyRatelimiter + public class LocalGatewayRatelimiter: IGatewayRatelimiter { // docs specify 5 seconds, but we're actually throttling connections, not identify, so we need a bit of leeway private static readonly TimeSpan BucketLength = TimeSpan.FromSeconds(6); @@ -17,13 +17,13 @@ namespace Myriad.Gateway private Task? _refillTask; private readonly ILogger _logger; - public ShardIdentifyRatelimiter(ILogger logger, int maxConcurrency) + public LocalGatewayRatelimiter(ILogger logger, int maxConcurrency) { - _logger = logger.ForContext(); + _logger = logger.ForContext(); _maxConcurrency = maxConcurrency; } - public Task Acquire(int shard) + public Task Identify(int shard) { var bucket = shard % _maxConcurrency; var queue = _buckets.GetOrAdd(bucket, _ => new ConcurrentQueue()); diff --git a/Myriad/Gateway/Limit/TwilightGatewayRatelimiter.cs b/Myriad/Gateway/Limit/TwilightGatewayRatelimiter.cs new file mode 100644 index 00000000..7859eade --- /dev/null +++ b/Myriad/Gateway/Limit/TwilightGatewayRatelimiter.cs @@ -0,0 +1,27 @@ +using System.Net.Http; +using System.Threading.Tasks; + +using Serilog; + +namespace Myriad.Gateway.Limit +{ + public class TwilightGatewayRatelimiter: IGatewayRatelimiter + { + private readonly string _url; + private readonly ILogger _logger; + private readonly HttpClient _httpClient = new(); + + public TwilightGatewayRatelimiter(ILogger logger, string url) + { + _url = url; + _logger = logger; + } + + public async Task Identify(int shard) + { + // Literally just request and wait :p + _logger.Information("Shard {ShardId}: Requesting identify at gateway queue {GatewayQueueUrl}", shard, _url); + await _httpClient.GetAsync(_url); + } + } +} \ No newline at end of file diff --git a/Myriad/Gateway/Shard.cs b/Myriad/Gateway/Shard.cs index f90b0741..c7f84230 100644 --- a/Myriad/Gateway/Shard.cs +++ b/Myriad/Gateway/Shard.cs @@ -3,6 +3,7 @@ using System.Net.WebSockets; using System.Text.Json; using System.Threading.Tasks; +using Myriad.Gateway.Limit; using Myriad.Gateway.State; using Myriad.Serialization; using Myriad.Types; @@ -17,7 +18,7 @@ namespace Myriad.Gateway private readonly GatewaySettings _settings; private readonly ShardInfo _info; - private readonly ShardIdentifyRatelimiter _ratelimiter; + private readonly IGatewayRatelimiter _ratelimiter; private readonly string _url; private readonly ILogger _logger; private readonly ShardStateManager _stateManager; @@ -41,7 +42,7 @@ namespace Myriad.Gateway private TimeSpan _reconnectDelay = TimeSpan.Zero; private Task? _worker; - public Shard(GatewaySettings settings, ShardInfo info, ShardIdentifyRatelimiter ratelimiter, string url, ILogger logger) + public Shard(GatewaySettings settings, ShardInfo info, IGatewayRatelimiter ratelimiter, string url, ILogger logger) { _jsonSerializerOptions = new JsonSerializerOptions().ConfigureForMyriad(); @@ -105,11 +106,14 @@ namespace Myriad.Gateway } } - public Task Start() + public async Task Start() { if (_worker == null) _worker = ShardLoop(); - return Task.CompletedTask; + + // we can probably TCS this instead of spin loop but w/e + while (State != ShardState.Connected) + await Task.Delay(100); } public async Task UpdateStatus(GatewayStatusUpdate payload) @@ -125,7 +129,7 @@ namespace Myriad.Gateway { while (true) { - await _ratelimiter.Acquire(_info.ShardId); + await _ratelimiter.Identify(_info.ShardId); _logger.Information("Shard {ShardId}: Connecting to WebSocket", _info.ShardId); try diff --git a/PluralKit.Bot/BotConfig.cs b/PluralKit.Bot/BotConfig.cs index 8215afbb..eeae52cd 100644 --- a/PluralKit.Bot/BotConfig.cs +++ b/PluralKit.Bot/BotConfig.cs @@ -17,6 +17,8 @@ namespace PluralKit.Bot public ulong? AdminRole { get; set; } public ClusterSettings? Cluster { get; set; } + + public string? GatewayQueueUrl { get; set; } public record ClusterSettings { diff --git a/PluralKit.Bot/Modules.cs b/PluralKit.Bot/Modules.cs index 4a940de4..9a80834f 100644 --- a/PluralKit.Bot/Modules.cs +++ b/PluralKit.Bot/Modules.cs @@ -28,6 +28,7 @@ namespace PluralKit.Bot { Token = botConfig.Token, MaxShardConcurrency = botConfig.MaxShardConcurrency, + GatewayQueueUrl = botConfig.GatewayQueueUrl, Intents = GatewayIntent.Guilds | GatewayIntent.DirectMessages | GatewayIntent.DirectMessageReactions |