PluralKit/PluralKit.Core/Utils/HandlerQueue.cs

78 lines
2.4 KiB
C#
Raw Normal View History

2020-05-05 14:03:46 +00:00
using System;
using System.Collections.Concurrent;
2020-05-05 14:03:46 +00:00
using System.Threading;
using System.Threading.Tasks;
using NodaTime;
2020-06-13 20:20:24 +00:00
namespace PluralKit.Core
2020-05-05 14:03:46 +00:00
{
public class HandlerQueue<T>
{
private long _seq;
2020-12-24 13:52:44 +00:00
private readonly ConcurrentDictionary<long, HandlerEntry> _handlers = new();
2020-05-05 14:03:46 +00:00
public async Task<T> WaitFor(Func<T, bool> predicate, Duration? timeout = null, CancellationToken ct = default)
{
var timeoutTask = Task.Delay(timeout?.ToTimeSpan() ?? TimeSpan.FromMilliseconds(-1), ct);
var tcs = new TaskCompletionSource<T>();
ValueTask Handler(T e)
2020-05-05 14:03:46 +00:00
{
tcs.SetResult(e);
return default;
2020-05-05 14:03:46 +00:00
}
2021-08-27 15:03:47 +00:00
var entry = new HandlerEntry { Predicate = predicate, Handler = Handler };
_handlers[Interlocked.Increment(ref _seq)] = entry;
2020-05-05 14:03:46 +00:00
// Wait for either the event task or the timeout task
// If the timeout task finishes first, raise, otherwise pass event through
try
{
var theTask = await Task.WhenAny(timeoutTask, tcs.Task);
if (theTask == timeoutTask)
throw new TimeoutException();
}
finally
{
entry.Remove();
}
return await tcs.Task;
}
public async Task<bool> TryHandle(T evt)
{
// First pass to clean up dead handlers
foreach (var (k, entry) in _handlers)
if (!entry.Alive)
_handlers.TryRemove(k, out _);
2020-05-05 14:03:46 +00:00
// Now iterate and try handling until we find a good one
2020-05-05 14:03:46 +00:00
var now = SystemClock.Instance.GetCurrentInstant();
foreach (var (_, entry) in _handlers)
2020-05-05 14:03:46 +00:00
{
if (entry.Expiry < now) entry.Alive = false;
else if (entry.Alive && entry.Predicate(evt))
2020-05-05 14:03:46 +00:00
{
await entry.Handler(evt);
2020-05-05 14:03:46 +00:00
entry.Alive = false;
return true;
}
}
return false;
}
public class HandlerEntry
{
internal Func<T, ValueTask> Handler;
internal Func<T, bool> Predicate;
2020-05-05 14:03:46 +00:00
internal bool Alive = true;
internal Instant Expiry = SystemClock.Instance.GetCurrentInstant() + Duration.FromMinutes(30);
public void Remove() => Alive = false;
}
}
}