PluralKit/Myriad/Gateway/Cluster.cs

116 lines
4.1 KiB
C#
Raw Normal View History

2020-12-22 12:15:26 +00:00
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
2021-02-01 13:26:39 +00:00
using System.Threading;
2020-12-22 12:15:26 +00:00
using System.Threading.Tasks;
using Myriad.Types;
using Serilog;
namespace Myriad.Gateway
{
public class Cluster
{
private readonly GatewaySettings _gatewaySettings;
private readonly ILogger _logger;
private readonly ConcurrentDictionary<int, Shard> _shards = new();
public Cluster(GatewaySettings gatewaySettings, ILogger logger)
{
_gatewaySettings = gatewaySettings;
_logger = logger;
}
public Func<Shard, IGatewayEvent, Task>? EventReceived { get; set; }
2021-01-30 00:07:43 +00:00
public event Action<Shard>? ShardCreated;
2020-12-22 12:15:26 +00:00
public IReadOnlyDictionary<int, Shard> Shards => _shards;
public ClusterSessionState SessionState => GetClusterState();
public User? User => _shards.Values.Select(s => s.User).FirstOrDefault(s => s != null);
2020-12-24 13:52:44 +00:00
public ApplicationPartial? Application => _shards.Values.Select(s => s.Application).FirstOrDefault(s => s != null);
2020-12-22 12:15:26 +00:00
private ClusterSessionState GetClusterState()
{
var shards = new List<ClusterSessionState.ShardState>();
foreach (var (id, shard) in _shards)
shards.Add(new ClusterSessionState.ShardState
{
2021-01-30 00:07:43 +00:00
Shard = shard.ShardInfo,
Session = shard.SessionInfo
2020-12-22 12:15:26 +00:00
});
return new ClusterSessionState {Shards = shards};
}
public async Task Start(GatewayInfo.Bot info, ClusterSessionState? lastState = null)
{
if (lastState != null && lastState.Shards.Count == info.Shards)
2021-02-01 13:26:39 +00:00
await Resume(info.Url, lastState, info.SessionStartLimit.MaxConcurrency);
2020-12-22 12:15:26 +00:00
else
2021-02-01 13:26:39 +00:00
await Start(info.Url, info.Shards, info.SessionStartLimit.MaxConcurrency);
2020-12-22 12:15:26 +00:00
}
2021-02-01 13:26:39 +00:00
public async Task Resume(string url, ClusterSessionState sessionState, int concurrency)
2020-12-22 12:15:26 +00:00
{
_logger.Information("Resuming session with {ShardCount} shards at {Url}", sessionState.Shards.Count, url);
foreach (var shardState in sessionState.Shards)
CreateAndAddShard(url, shardState.Shard, shardState.Session);
2021-02-01 13:26:39 +00:00
await StartShards(concurrency);
2020-12-22 12:15:26 +00:00
}
2021-02-01 13:26:39 +00:00
public async Task Start(string url, int shardCount, int concurrency)
2020-12-22 12:15:26 +00:00
{
_logger.Information("Starting {ShardCount} shards at {Url}", shardCount, url);
for (var i = 0; i < shardCount; i++)
CreateAndAddShard(url, new ShardInfo(i, shardCount), null);
2021-02-01 13:26:39 +00:00
await StartShards(concurrency);
2020-12-22 12:15:26 +00:00
}
2021-02-01 13:26:39 +00:00
private async Task StartShards(int concurrency)
2020-12-22 12:15:26 +00:00
{
2021-02-01 13:26:39 +00:00
var lastTime = DateTimeOffset.UtcNow;
var identifyCalls = 0;
2020-12-22 12:15:26 +00:00
_logger.Information("Connecting shards...");
2021-02-01 13:26:39 +00:00
foreach (var shard in _shards.Values)
{
if (identifyCalls >= concurrency)
{
var timeout = lastTime + TimeSpan.FromSeconds(5.5);
var delay = timeout - DateTimeOffset.UtcNow;
if (delay > TimeSpan.Zero)
{
_logger.Information("Hit shard concurrency limit, waiting {Delay}", delay);
await Task.Delay(delay);
}
identifyCalls = 0;
lastTime = DateTimeOffset.UtcNow;
}
await shard.Start();
identifyCalls++;
}
2020-12-22 12:15:26 +00:00
}
private void CreateAndAddShard(string url, ShardInfo shardInfo, ShardSessionInfo? session)
{
var shard = new Shard(_logger, new Uri(url), _gatewaySettings, shardInfo, session);
shard.OnEventReceived += evt => OnShardEventReceived(shard, evt);
_shards[shardInfo.ShardId] = shard;
2021-01-30 00:07:43 +00:00
ShardCreated?.Invoke(shard);
2020-12-22 12:15:26 +00:00
}
private async Task OnShardEventReceived(Shard shard, IGatewayEvent evt)
{
if (EventReceived != null)
await EventReceived(shard, evt);
}
}
}