Refactor proxy service

This commit is contained in:
Ske 2019-08-12 05:47:55 +02:00
parent fda98bc25f
commit 145ecb91ad
4 changed files with 153 additions and 142 deletions

View File

@ -104,6 +104,7 @@ namespace PluralKit.Bot
.AddTransient<ProxyService>() .AddTransient<ProxyService>()
.AddTransient<LogChannelService>() .AddTransient<LogChannelService>()
.AddTransient<DataFileService>() .AddTransient<DataFileService>()
.AddTransient<WebhookExecutorService>()
.AddTransient<ProxyCacheService>() .AddTransient<ProxyCacheService>()
.AddSingleton<WebhookCacheService>() .AddSingleton<WebhookCacheService>()

View File

@ -23,27 +23,23 @@ namespace PluralKit.Bot
class ProxyService: IDisposable { class ProxyService: IDisposable {
private IDiscordClient _client; private IDiscordClient _client;
private DbConnectionFactory _conn;
private LogChannelService _logChannel; private LogChannelService _logChannel;
private WebhookCacheService _webhookCache;
private MessageStore _messageStorage; private MessageStore _messageStorage;
private EmbedService _embeds; private EmbedService _embeds;
private IMetrics _metrics;
private ILogger _logger; private ILogger _logger;
private WebhookExecutorService _webhookExecutor;
private ProxyCacheService _cache; private ProxyCacheService _cache;
private HttpClient _httpClient; private HttpClient _httpClient;
public ProxyService(IDiscordClient client, WebhookCacheService webhookCache, DbConnectionFactory conn, LogChannelService logChannel, MessageStore messageStorage, EmbedService embeds, IMetrics metrics, ILogger logger, ProxyCacheService cache) public ProxyService(IDiscordClient client, LogChannelService logChannel, MessageStore messageStorage, EmbedService embeds, ILogger logger, ProxyCacheService cache, WebhookExecutorService webhookExecutor)
{ {
_client = client; _client = client;
_webhookCache = webhookCache;
_conn = conn;
_logChannel = logChannel; _logChannel = logChannel;
_messageStorage = messageStorage; _messageStorage = messageStorage;
_embeds = embeds; _embeds = embeds;
_metrics = metrics;
_cache = cache; _cache = cache;
_webhookExecutor = webhookExecutor;
_logger = logger.ForContext<ProxyService>(); _logger = logger.ForContext<ProxyService>();
_httpClient = new HttpClient(); _httpClient = new HttpClient();
@ -84,7 +80,7 @@ namespace PluralKit.Bot
public async Task HandleMessageAsync(IMessage message) public async Task HandleMessageAsync(IMessage message)
{ {
// Bail early if this isn't in a guild channel // Bail early if this isn't in a guild channel
if (!(message.Channel is IGuildChannel)) return; if (!(message.Channel is ITextChannel)) return;
var results = await _cache.GetResultsFor(message.Author.Id); var results = await _cache.GetResultsFor(message.Author.Id);
@ -100,15 +96,20 @@ namespace PluralKit.Bot
if (match.InnerText.Trim().Length == 0 && message.Attachments.Count == 0) if (match.InnerText.Trim().Length == 0 && message.Attachments.Count == 0)
return; return;
// Get variables in order and all
var proxyName = match.Member.ProxyName(match.System.Tag);
var avatarUrl = match.Member.AvatarUrl ?? match.System.AvatarUrl;
// Sanitize @everyone, but only if the original user wouldn't have permission to // Sanitize @everyone, but only if the original user wouldn't have permission to
var messageContents = SanitizeEveryoneMaybe(message, match.InnerText); var messageContents = SanitizeEveryoneMaybe(message, match.InnerText);
// Fetch a webhook for this channel, and send the proxied message // Execute the webhook itself
var webhookCacheEntry = await _webhookCache.GetWebhook(message.Channel as ITextChannel); var hookMessageId = await _webhookExecutor.ExecuteWebhook(
var avatarUrl = match.Member.AvatarUrl ?? match.System.AvatarUrl; (ITextChannel) message.Channel,
var proxyName = match.Member.ProxyName(match.System.Tag); proxyName, avatarUrl,
messageContents,
var hookMessageId = await ExecuteWebhookRetrying(message, webhookCacheEntry, messageContents, proxyName, avatarUrl); message.Attachments.FirstOrDefault()
);
// Store the message in the database, and log it in the log channel (if applicable) // Store the message in the database, and log it in the log channel (if applicable)
await _messageStorage.Store(message.Author.Id, hookMessageId, message.Channel.Id, message.Id, match.Member); await _messageStorage.Store(message.Author.Id, hookMessageId, message.Channel.Id, message.Id, match.Member);
@ -120,37 +121,12 @@ namespace PluralKit.Bot
try try
{ {
await message.DeleteAsync(); await message.DeleteAsync();
} catch (HttpException) {} // If it's already deleted, we just swallow the exception
} }
catch (HttpException)
private async Task<ulong> ExecuteWebhookRetrying(IMessage message, WebhookCacheService.WebhookCacheEntry webhookCacheEntry, string messageContents,
string proxyName, string avatarUrl)
{ {
// In the case where the webhook is deleted, we'll only actually notice that // If it's already deleted, we just log and swallow the exception
// when we try to execute the webhook. Therefore we try it once, and _logger.Warning("Attempted to delete already deleted proxy trigger message {Message}", message.Id);
// if Discord returns error 10015 ("Unknown Webhook"), we remake it and try again.
ulong hookMessageId;
try
{
using (_metrics.Measure.Timer.Time(BotMetrics.WebhookResponseTime))
hookMessageId = await ExecuteWebhook(webhookCacheEntry, messageContents, proxyName, avatarUrl,
message.Attachments.FirstOrDefault());
} }
catch (HttpException e)
{
if (e.DiscordCode == 10015)
{
_logger.Warning("Webhook {Webhook} not found, recreating one", webhookCacheEntry.Webhook.Id);
webhookCacheEntry = await _webhookCache.InvalidateAndRefreshWebhook(webhookCacheEntry);
using (_metrics.Measure.Timer.Time(BotMetrics.WebhookResponseTime))
hookMessageId = await ExecuteWebhook(webhookCacheEntry, messageContents, proxyName, avatarUrl,
message.Attachments.FirstOrDefault());
}
else throw;
}
return hookMessageId;
} }
private static string SanitizeEveryoneMaybe(IMessage message, string messageContents) private static string SanitizeEveryoneMaybe(IMessage message, string messageContents)
@ -182,51 +158,6 @@ namespace PluralKit.Bot
return true; return true;
} }
private async Task<ulong> ExecuteWebhook(WebhookCacheService.WebhookCacheEntry cacheEntry, string text, string username, string avatarUrl, IAttachment attachment)
{
_logger.Debug("Invoking webhook");
username = FixClyde(username);
// TODO: clean this entire block up
ulong messageId;
try
{
var client = cacheEntry.Client;
if (attachment != null)
{
using (var stream = await _httpClient.GetStreamAsync(attachment.Url))
{
messageId = await client.SendFileAsync(stream, filename: attachment.Filename, text: text,
username: username, avatarUrl: avatarUrl);
}
}
else
{
messageId = await client.SendMessageAsync(text, username: username, avatarUrl: avatarUrl);
}
_logger.Information("Invoked webhook {Webhook} in channel {Channel}", cacheEntry.Webhook.Id,
cacheEntry.Webhook.ChannelId);
// Log it in the metrics
_metrics.Measure.Meter.Mark(BotMetrics.MessagesProxied, "success");
}
catch (HttpException e)
{
_logger.Warning(e, "Error invoking webhook {Webhook} in channel {Channel}", cacheEntry.Webhook.Id,
cacheEntry.Webhook.ChannelId);
// Log failure in metrics and rethrow (we still need to cancel everything else)
_metrics.Measure.Meter.Mark(BotMetrics.MessagesProxied, "failure");
throw;
}
// TODO: figure out a way to return the full message object (without doing a GetMessageAsync call, which
// doesn't work if there's no permission to)
return messageId;
}
public Task HandleReactionAddedAsync(Cacheable<IUserMessage, ulong> message, ISocketMessageChannel channel, SocketReaction reaction) public Task HandleReactionAddedAsync(Cacheable<IUserMessage, ulong> message, ISocketMessageChannel channel, SocketReaction reaction)
{ {
// Dispatch on emoji // Dispatch on emoji
@ -298,16 +229,6 @@ namespace PluralKit.Bot
await _messageStorage.BulkDelete(messages.Select(m => m.Id).ToList()); await _messageStorage.BulkDelete(messages.Select(m => m.Id).ToList());
} }
private string FixClyde(string name)
{
var match = Regex.Match(name, "clyde", RegexOptions.IgnoreCase);
if (!match.Success) return name;
// Put a hair space (\u200A) between the "c" and the "lyde" in the match to avoid Discord matching it
// since Discord blocks webhooks containing the word "Clyde"... for some reason. /shrug
return name.Substring(0, match.Index + 1) + '\u200A' + name.Substring(match.Index + 1);
}
public void Dispose() public void Dispose()
{ {
_httpClient.Dispose(); _httpClient.Dispose();

View File

@ -8,18 +8,12 @@ using Serilog;
namespace PluralKit.Bot namespace PluralKit.Bot
{ {
public class WebhookCacheService: IDisposable public class WebhookCacheService
{ {
public class WebhookCacheEntry
{
internal DiscordWebhookClient Client;
internal IWebhook Webhook;
}
public static readonly string WebhookName = "PluralKit Proxy Webhook"; public static readonly string WebhookName = "PluralKit Proxy Webhook";
private IDiscordClient _client; private IDiscordClient _client;
private ConcurrentDictionary<ulong, Lazy<Task<WebhookCacheEntry>>> _webhooks; private ConcurrentDictionary<ulong, Lazy<Task<IWebhook>>> _webhooks;
private ILogger _logger; private ILogger _logger;
@ -27,44 +21,41 @@ namespace PluralKit.Bot
{ {
_client = client; _client = client;
_logger = logger.ForContext<WebhookCacheService>(); _logger = logger.ForContext<WebhookCacheService>();
_webhooks = new ConcurrentDictionary<ulong, Lazy<Task<WebhookCacheEntry>>>(); _webhooks = new ConcurrentDictionary<ulong, Lazy<Task<IWebhook>>>();
} }
public async Task<WebhookCacheEntry> GetWebhook(ulong channelId) public async Task<IWebhook> GetWebhook(ulong channelId)
{ {
var channel = await _client.GetChannelAsync(channelId) as ITextChannel; var channel = await _client.GetChannelAsync(channelId) as ITextChannel;
if (channel == null) return null; if (channel == null) return null;
return await GetWebhook(channel); return await GetWebhook(channel);
} }
public async Task<WebhookCacheEntry> GetWebhook(ITextChannel channel) public async Task<IWebhook> GetWebhook(ITextChannel channel)
{ {
// We cache the webhook through a Lazy<Task<T>>, this way we make sure to only create one webhook per channel // We cache the webhook through a Lazy<Task<T>>, this way we make sure to only create one webhook per channel
// If the webhook is requested twice before it's actually been found, the Lazy<T> wrapper will stop the // If the webhook is requested twice before it's actually been found, the Lazy<T> wrapper will stop the
// webhook from being created twice. // webhook from being created twice.
var lazyWebhookValue = var lazyWebhookValue =
_webhooks.GetOrAdd(channel.Id, new Lazy<Task<WebhookCacheEntry>>(() => GetOrCreateWebhook(channel))); _webhooks.GetOrAdd(channel.Id, new Lazy<Task<IWebhook>>(() => GetOrCreateWebhook(channel)));
// It's possible to "move" a webhook to a different channel after creation // It's possible to "move" a webhook to a different channel after creation
// Here, we ensure it's actually still pointing towards the proper channel, and if not, wipe and refetch one. // Here, we ensure it's actually still pointing towards the proper channel, and if not, wipe and refetch one.
var webhook = await lazyWebhookValue.Value; var webhook = await lazyWebhookValue.Value;
if (webhook.Webhook.ChannelId != channel.Id) return await InvalidateAndRefreshWebhook(webhook); if (webhook.ChannelId != channel.Id) return await InvalidateAndRefreshWebhook(webhook);
return webhook; return webhook;
} }
public async Task<WebhookCacheEntry> InvalidateAndRefreshWebhook(WebhookCacheEntry webhook) public async Task<IWebhook> InvalidateAndRefreshWebhook(IWebhook webhook)
{ {
_logger.Information("Refreshing webhook for channel {Channel}", webhook.Webhook.ChannelId); _logger.Information("Refreshing webhook for channel {Channel}", webhook.ChannelId);
_webhooks.TryRemove(webhook.Webhook.ChannelId, out _); _webhooks.TryRemove(webhook.ChannelId, out _);
return await GetWebhook(webhook.Webhook.Channel); return await GetWebhook(webhook.Channel);
} }
private async Task<WebhookCacheEntry> GetOrCreateWebhook(ITextChannel channel) private async Task<IWebhook> GetOrCreateWebhook(ITextChannel channel) =>
{ await FindExistingWebhook(channel) ?? await DoCreateWebhook(channel);
var webhook = await FindExistingWebhook(channel) ?? await DoCreateWebhook(channel);
return await DoCreateWebhookClient(webhook);
}
private async Task<IWebhook> FindExistingWebhook(ITextChannel channel) private async Task<IWebhook> FindExistingWebhook(ITextChannel channel)
{ {
@ -78,28 +69,8 @@ namespace PluralKit.Bot
return channel.CreateWebhookAsync(WebhookName); return channel.CreateWebhookAsync(WebhookName);
} }
private Task<WebhookCacheEntry> DoCreateWebhookClient(IWebhook webhook)
{
// DiscordWebhookClient's ctor is synchronous despite doing web calls, so we wrap it in a Task
return Task.Run(() =>
{
return new WebhookCacheEntry
{
Client = new DiscordWebhookClient(webhook),
Webhook = webhook
};
});
}
private bool IsWebhookMine(IWebhook arg) => arg.Creator.Id == _client.CurrentUser.Id && arg.Name == WebhookName; private bool IsWebhookMine(IWebhook arg) => arg.Creator.Id == _client.CurrentUser.Id && arg.Name == WebhookName;
public int CacheSize => _webhooks.Count; public int CacheSize => _webhooks.Count;
public void Dispose()
{
foreach (var entry in _webhooks.Values)
if (entry.IsValueCreated)
entry.Value.Result.Client.Dispose();
}
} }
} }

View File

@ -0,0 +1,118 @@
using System;
using System.Net.Http;
using System.Text.RegularExpressions;
using System.Threading.Tasks;
using App.Metrics;
using App.Metrics.Logging;
using Discord;
using Discord.Net;
using Discord.Webhook;
using Microsoft.Extensions.Caching.Memory;
using Serilog;
namespace PluralKit.Bot
{
public class WebhookExecutorService: IDisposable
{
private WebhookCacheService _webhookCache;
private IMemoryCache _cache;
private ILogger _logger;
private IMetrics _metrics;
private HttpClient _client;
public WebhookExecutorService(IMemoryCache cache, IMetrics metrics, WebhookCacheService webhookCache, ILogger logger)
{
_cache = cache;
_metrics = metrics;
_webhookCache = webhookCache;
_logger = logger.ForContext<WebhookExecutorService>();
_client = new HttpClient();
}
public async Task<ulong> ExecuteWebhook(ITextChannel channel, string name, string avatarUrl, string content, IAttachment attachment)
{
_logger.Debug("Invoking webhook in channel {Channel}", channel.Id);
// Get a webhook, execute it
var webhook = await _webhookCache.GetWebhook(channel);
var id = await ExecuteWebhookInner(webhook, name, avatarUrl, content, attachment);
// Log the relevant metrics
_metrics.Measure.Meter.Mark(BotMetrics.MessagesProxied);
_logger.Information("Invoked webhook {Webhook} in channel {Channel}", webhook.Id,
channel.Id);
return id;
}
private async Task<ulong> ExecuteWebhookInner(IWebhook webhook, string name, string avatarUrl, string content,
IAttachment attachment, bool hasRetried = false)
{
var client = await GetClientFor(webhook);
try
{
// If we have an attachment, use the special SendFileAsync method
if (attachment != null)
using (var attachmentStream = await _client.GetStreamAsync(attachment.Url))
using (_metrics.Measure.Timer.Time(BotMetrics.WebhookResponseTime))
return await client.SendFileAsync(attachmentStream, attachment.Filename, content,
username: FixClyde(name),
avatarUrl: avatarUrl);
// Otherwise, send normally
return await client.SendMessageAsync(content, username: name, avatarUrl: avatarUrl);
}
catch (HttpException e)
{
// If we hit an error, just retry (if we haven't already)
if (e.DiscordCode == 10015 && !hasRetried) // Error 10015 = "Unknown Webhook"
{
_logger.Warning(e, "Error invoking webhook {Webhook} in channel {Channel}", webhook.Id, webhook.ChannelId);
return await ExecuteWebhookInner(await _webhookCache.InvalidateAndRefreshWebhook(webhook), name, avatarUrl, content, attachment, hasRetried: true);
}
throw;
}
}
private async Task<DiscordWebhookClient> GetClientFor(IWebhook webhook)
{
_logger.Debug("Looking for client for webhook {Webhook} in cache", webhook.Id);
return await _cache.GetOrCreateAsync($"_webhook_client_{webhook.Id}",
(entry) => MakeCachedClientFor(entry, webhook));
}
private Task<DiscordWebhookClient> MakeCachedClientFor(ICacheEntry entry, IWebhook webhook) {
_logger.Information("Client for {Webhook} not found in cache, creating", webhook.Id);
// Define expiration for the client cache
// 10 minutes *without a query* and it gets yoten
entry.SlidingExpiration = TimeSpan.FromMinutes(10);
// IMemoryCache won't automatically dispose of its values when the cache gets evicted
// so we register a hook to do so here.
entry.RegisterPostEvictionCallback((key, value, reason, state) => (value as IDisposable)?.Dispose());
// DiscordWebhookClient has a sync network call in its constructor (!!!)
// and we want to punt that onto a task queue, so we do that.
return Task.Run(() => new DiscordWebhookClient(webhook));
}
private string FixClyde(string name)
{
// Check if the name contains "Clyde" - if not, do nothing
var match = Regex.Match(name, "clyde", RegexOptions.IgnoreCase);
if (!match.Success) return name;
// Put a hair space (\u200A) between the "c" and the "lyde" in the match to avoid Discord matching it
// since Discord blocks webhooks containing the word "Clyde"... for some reason. /shrug
return name.Substring(0, match.Index + 1) + '\u200A' + name.Substring(match.Index + 1);
}
public void Dispose()
{
_client.Dispose();
}
}
}