PluralKit/Myriad/Gateway/Cluster.cs

90 lines
3.1 KiB
C#
Raw Normal View History

2021-08-27 15:03:47 +00:00
using System;
2020-12-22 12:15:26 +00:00
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
2021-06-09 14:22:10 +00:00
using Myriad.Gateway.Limit;
2020-12-22 12:15:26 +00:00
using Myriad.Types;
using Serilog;
namespace Myriad.Gateway
{
public class Cluster
{
private readonly GatewaySettings _gatewaySettings;
private readonly ILogger _logger;
private readonly ConcurrentDictionary<int, Shard> _shards = new();
2021-06-09 14:22:10 +00:00
private IGatewayRatelimiter? _ratelimiter;
2020-12-22 12:15:26 +00:00
public Cluster(GatewaySettings gatewaySettings, ILogger logger)
{
_gatewaySettings = gatewaySettings;
2021-06-10 12:21:05 +00:00
_logger = logger.ForContext<Cluster>();
2020-12-22 12:15:26 +00:00
}
public Func<Shard, IGatewayEvent, Task>? EventReceived { get; set; }
2021-01-30 00:07:43 +00:00
public event Action<Shard>? ShardCreated;
2020-12-22 12:15:26 +00:00
public IReadOnlyDictionary<int, Shard> Shards => _shards;
2021-08-27 15:03:47 +00:00
2021-04-29 09:10:19 +00:00
public async Task Start(GatewayInfo.Bot info)
2020-12-22 12:15:26 +00:00
{
await Start(info.Url, 0, info.Shards - 1, info.Shards, info.SessionStartLimit.MaxConcurrency);
2020-12-22 12:15:26 +00:00
}
2021-06-09 14:22:10 +00:00
public async Task Start(string url, int shardMin, int shardMax, int shardTotal, int recommendedConcurrency)
2020-12-22 12:15:26 +00:00
{
2021-06-09 14:22:10 +00:00
_ratelimiter = GetRateLimiter(recommendedConcurrency);
2021-08-27 15:03:47 +00:00
var shardCount = shardMax - shardMin + 1;
_logger.Information("Starting {ShardCount} of {ShardTotal} shards (#{ShardMin}-#{ShardMax}) at {Url}",
shardCount, shardTotal, shardMin, shardMax, url);
for (var i = shardMin; i <= shardMax; i++)
CreateAndAddShard(url, new ShardInfo(i, shardTotal));
2020-12-22 12:15:26 +00:00
2021-04-29 09:10:19 +00:00
await StartShards();
2020-12-22 12:15:26 +00:00
}
2021-04-29 09:10:19 +00:00
private async Task StartShards()
2020-12-22 12:15:26 +00:00
{
_logger.Information("Connecting shards...");
2021-08-27 15:03:47 +00:00
foreach (var shard in _shards.Values)
2021-02-01 13:26:39 +00:00
await shard.Start();
2020-12-22 12:15:26 +00:00
}
2021-04-29 09:10:19 +00:00
private void CreateAndAddShard(string url, ShardInfo shardInfo)
2020-12-22 12:15:26 +00:00
{
2021-04-29 09:10:19 +00:00
var shard = new Shard(_gatewaySettings, shardInfo, _ratelimiter!, url, _logger);
2020-12-22 12:15:26 +00:00
shard.OnEventReceived += evt => OnShardEventReceived(shard, evt);
_shards[shardInfo.ShardId] = shard;
2021-08-27 15:03:47 +00:00
2021-01-30 00:07:43 +00:00
ShardCreated?.Invoke(shard);
2020-12-22 12:15:26 +00:00
}
private async Task OnShardEventReceived(Shard shard, IGatewayEvent evt)
{
if (EventReceived != null)
await EventReceived(shard, evt);
}
2021-08-27 15:03:47 +00:00
private int GetActualShardConcurrency(int recommendedConcurrency)
{
if (_gatewaySettings.MaxShardConcurrency == null)
return recommendedConcurrency;
2021-08-27 15:03:47 +00:00
return Math.Min(_gatewaySettings.MaxShardConcurrency.Value, recommendedConcurrency);
}
2021-06-09 14:22:10 +00:00
private IGatewayRatelimiter GetRateLimiter(int recommendedConcurrency)
{
if (_gatewaySettings.GatewayQueueUrl != null)
{
return new TwilightGatewayRatelimiter(_logger, _gatewaySettings.GatewayQueueUrl);
}
2021-08-27 15:03:47 +00:00
2021-06-09 14:22:10 +00:00
var concurrency = GetActualShardConcurrency(recommendedConcurrency);
return new LocalGatewayRatelimiter(_logger, concurrency);
}
2020-12-22 12:15:26 +00:00
}
}