PluralKit/Myriad/Gateway/Limit/LocalGatewayRatelimiter.cs

73 lines
2.2 KiB
C#
Raw Normal View History

2021-04-29 09:10:19 +00:00
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using Serilog;
2021-06-09 14:22:10 +00:00
namespace Myriad.Gateway.Limit
2021-04-29 09:10:19 +00:00
{
2021-06-09 14:22:10 +00:00
public class LocalGatewayRatelimiter: IGatewayRatelimiter
2021-04-29 09:10:19 +00:00
{
// docs specify 5 seconds, but we're actually throttling connections, not identify, so we need a bit of leeway
private static readonly TimeSpan BucketLength = TimeSpan.FromSeconds(6);
2021-04-29 09:10:19 +00:00
private readonly ConcurrentDictionary<int, ConcurrentQueue<TaskCompletionSource>> _buckets = new();
private readonly int _maxConcurrency;
private Task? _refillTask;
private readonly ILogger _logger;
2021-06-09 14:22:10 +00:00
public LocalGatewayRatelimiter(ILogger logger, int maxConcurrency)
2021-04-29 09:10:19 +00:00
{
2021-06-09 14:22:10 +00:00
_logger = logger.ForContext<LocalGatewayRatelimiter>();
2021-04-29 09:10:19 +00:00
_maxConcurrency = maxConcurrency;
}
2021-06-09 14:22:10 +00:00
public Task Identify(int shard)
2021-04-29 09:10:19 +00:00
{
var bucket = shard % _maxConcurrency;
var queue = _buckets.GetOrAdd(bucket, _ => new ConcurrentQueue<TaskCompletionSource>());
var tcs = new TaskCompletionSource();
queue.Enqueue(tcs);
ScheduleRefill();
return tcs.Task;
}
private void ScheduleRefill()
{
if (_refillTask != null && !_refillTask.IsCompleted)
return;
_refillTask?.Dispose();
_refillTask = RefillTask();
}
private async Task RefillTask()
{
await Task.Delay(TimeSpan.FromMilliseconds(250));
while (true)
{
var isClear = true;
foreach (var (bucket, queue) in _buckets)
{
if (!queue.TryDequeue(out var tcs))
continue;
2021-06-10 12:21:05 +00:00
_logger.Debug(
2021-04-29 09:10:19 +00:00
"Allowing identify for bucket {BucketId} through ({QueueLength} left in bucket queue)",
bucket, queue.Count);
tcs.SetResult();
isClear = false;
}
if (isClear)
return;
await Task.Delay(BucketLength);
}
}
}
}