feat: add Redis identify ratelimiter
This commit is contained in:
@@ -5,6 +5,8 @@ using Myriad.Types;
|
||||
|
||||
using Serilog;
|
||||
|
||||
using StackExchange.Redis;
|
||||
|
||||
namespace Myriad.Gateway;
|
||||
|
||||
public class Cluster
|
||||
@@ -25,14 +27,14 @@ public class Cluster
|
||||
public IReadOnlyDictionary<int, Shard> Shards => _shards;
|
||||
public event Action<Shard>? ShardCreated;
|
||||
|
||||
public async Task Start(GatewayInfo.Bot info)
|
||||
public async Task Start(GatewayInfo.Bot info, ConnectionMultiplexer? conn = null)
|
||||
{
|
||||
await Start(info.Url, 0, info.Shards - 1, info.Shards, info.SessionStartLimit.MaxConcurrency);
|
||||
await Start(info.Url, 0, info.Shards - 1, info.Shards, info.SessionStartLimit.MaxConcurrency, conn);
|
||||
}
|
||||
|
||||
public async Task Start(string url, int shardMin, int shardMax, int shardTotal, int recommendedConcurrency)
|
||||
public async Task Start(string url, int shardMin, int shardMax, int shardTotal, int recommendedConcurrency, ConnectionMultiplexer? conn = null)
|
||||
{
|
||||
_ratelimiter = GetRateLimiter(recommendedConcurrency);
|
||||
_ratelimiter = GetRateLimiter(recommendedConcurrency, conn);
|
||||
|
||||
var shardCount = shardMax - shardMin + 1;
|
||||
_logger.Information("Starting {ShardCount} of {ShardTotal} shards (#{ShardMin}-#{ShardMax}) at {Url}",
|
||||
@@ -73,12 +75,21 @@ public class Cluster
|
||||
return Math.Min(_gatewaySettings.MaxShardConcurrency.Value, recommendedConcurrency);
|
||||
}
|
||||
|
||||
private IGatewayRatelimiter GetRateLimiter(int recommendedConcurrency)
|
||||
private IGatewayRatelimiter GetRateLimiter(int recommendedConcurrency, ConnectionMultiplexer? conn = null)
|
||||
{
|
||||
var concurrency = GetActualShardConcurrency(recommendedConcurrency);
|
||||
|
||||
if (_gatewaySettings.UseRedisRatelimiter)
|
||||
{
|
||||
if (conn != null)
|
||||
return new RedisRatelimiter(_logger, conn, concurrency);
|
||||
else
|
||||
_logger.Warning("Tried to get Redis ratelimiter but connection is null! Continuing with local ratelimiter.");
|
||||
}
|
||||
|
||||
if (_gatewaySettings.GatewayQueueUrl != null)
|
||||
return new TwilightGatewayRatelimiter(_logger, _gatewaySettings.GatewayQueueUrl);
|
||||
|
||||
var concurrency = GetActualShardConcurrency(recommendedConcurrency);
|
||||
return new LocalGatewayRatelimiter(_logger, concurrency);
|
||||
}
|
||||
}
|
@@ -4,6 +4,7 @@ public record GatewaySettings
|
||||
{
|
||||
public string Token { get; init; }
|
||||
public GatewayIntent Intents { get; init; }
|
||||
public bool UseRedisRatelimiter { get; init; } = false;
|
||||
public int? MaxShardConcurrency { get; init; }
|
||||
public string? GatewayQueueUrl { get; init; }
|
||||
}
|
46
Myriad/Gateway/Limit/RedisRatelimiter.cs
Normal file
46
Myriad/Gateway/Limit/RedisRatelimiter.cs
Normal file
@@ -0,0 +1,46 @@
|
||||
using Serilog;
|
||||
|
||||
using StackExchange.Redis;
|
||||
|
||||
namespace Myriad.Gateway.Limit;
|
||||
|
||||
public class RedisRatelimiter: IGatewayRatelimiter
|
||||
{
|
||||
private readonly ILogger _logger;
|
||||
private readonly ConnectionMultiplexer _redis;
|
||||
|
||||
private int _concurrency { get; init; }
|
||||
|
||||
// todo: these might need to be tweaked a little
|
||||
private static TimeSpan expiry = TimeSpan.FromSeconds(5);
|
||||
private static TimeSpan retryInterval = TimeSpan.FromSeconds(1);
|
||||
|
||||
public RedisRatelimiter(ILogger logger, ConnectionMultiplexer redis, int concurrency)
|
||||
{
|
||||
_logger = logger.ForContext<TwilightGatewayRatelimiter>();
|
||||
_redis = redis;
|
||||
_concurrency = concurrency;
|
||||
}
|
||||
|
||||
public async Task Identify(int shard)
|
||||
{
|
||||
_logger.Information("Shard {ShardId}: requesting identify from Redis", shard);
|
||||
var key = "pluralkit:identify:" + (shard % _concurrency).ToString();
|
||||
await AcquireLock(key);
|
||||
}
|
||||
|
||||
public async Task AcquireLock(string key)
|
||||
{
|
||||
var conn = _redis.GetDatabase();
|
||||
|
||||
async Task<bool> TryAcquire()
|
||||
{
|
||||
_logger.Verbose("Trying to acquire lock on key {key} from Redis...", key);
|
||||
await Task.Delay(retryInterval);
|
||||
return await conn!.StringSetAsync(key, 0, expiry, When.NotExists);
|
||||
}
|
||||
|
||||
var acquired = false;
|
||||
while (!acquired) acquired = await TryAcquire();
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user