Several more database-y refactors

- DbConnectionFactory renamed to "Database", will now be the primary entry point for DB stuff
- Created IPKConnection interface mostly containing async extensions to IDbConnection, use this going forward
- Reworked the Connection/Command wrappers (that have performance/logging extensions)
- Probably more stuff that I forgot???
This commit is contained in:
Ske
2020-06-13 18:31:20 +02:00
parent a915ddb41c
commit e176ccbab5
29 changed files with 454 additions and 387 deletions

View File

@@ -0,0 +1,52 @@
using System;
using System.Threading.Tasks;
using App.Metrics;
using Npgsql;
using Serilog;
namespace PluralKit.Core
{
public class Database
{
private readonly CoreConfig _config;
private readonly ILogger _logger;
private readonly IMetrics _metrics;
private readonly DbConnectionCountHolder _countHolder;
public Database(CoreConfig config, DbConnectionCountHolder countHolder, ILogger logger,
IMetrics metrics)
{
_config = config;
_countHolder = countHolder;
_metrics = metrics;
_logger = logger;
}
public async Task<IPKConnection> Obtain()
{
// Mark the request (for a handle, I guess) in the metrics
_metrics.Measure.Meter.Mark(CoreMetrics.DatabaseRequests);
// Create a connection and open it
// We wrap it in PKConnection for tracing purposes
var conn = new PKConnection(new NpgsqlConnection(_config.Database), _countHolder, _logger, _metrics);
await conn.OpenAsync();
return conn;
}
public async Task Execute(Func<IPKConnection, Task> func)
{
await using var conn = await Obtain();
await func(conn);
}
public async Task<T> Execute<T>(Func<IPKConnection, Task<T>> func)
{
await using var conn = await Obtain();
return await func(conn);
}
}
}

View File

@@ -8,14 +8,14 @@ namespace PluralKit.Core
{
public static class DatabaseFunctionsExt
{
public static Task<MessageContext> QueryMessageContext(this IDbConnection conn, ulong account, ulong guild, ulong channel)
public static Task<MessageContext> QueryMessageContext(this IPKConnection conn, ulong account, ulong guild, ulong channel)
{
return conn.QueryFirstAsync<MessageContext>("message_context",
new { account_id = account, guild_id = guild, channel_id = channel },
commandType: CommandType.StoredProcedure);
}
public static Task<IEnumerable<ProxyMember>> QueryProxyMembers(this IDbConnection conn, ulong account, ulong guild)
public static Task<IEnumerable<ProxyMember>> QueryProxyMembers(this IPKConnection conn, ulong account, ulong guild)
{
return conn.QueryAsync<ProxyMember>("proxy_members",
new { account_id = account, guild_id = guild },

View File

@@ -16,10 +16,10 @@ namespace PluralKit.Core
private const string RootPath = "PluralKit.Core.Database"; // "resource path" root for SQL files
private const int TargetSchemaVersion = 7;
private DbConnectionFactory _conn;
private Database _conn;
private ILogger _logger;
public Schemas(DbConnectionFactory conn, ILogger logger)
public Schemas(Database conn, ILogger logger)
{
_conn = conn;
_logger = logger.ForContext<Schemas>();
@@ -36,7 +36,7 @@ namespace PluralKit.Core
{
// Run everything in a transaction
await using var conn = await _conn.Obtain();
using var tx = conn.BeginTransaction();
await using var tx = await conn.BeginTransactionAsync();
// Before applying migrations, clean out views/functions to prevent type errors
await ExecuteSqlFile($"{RootPath}.clean.sql", conn, tx);
@@ -49,10 +49,10 @@ namespace PluralKit.Core
await ExecuteSqlFile($"{RootPath}.Functions.functions.sql", conn, tx);
// Finally, commit tx
tx.Commit();
await tx.CommitAsync();
}
private async Task ApplyMigrations(IAsyncDbConnection conn, IDbTransaction tx)
private async Task ApplyMigrations(IPKConnection conn, IDbTransaction tx)
{
var currentVersion = await GetCurrentDatabaseVersion(conn);
_logger.Information("Current schema version: {CurrentVersion}", currentVersion);
@@ -63,7 +63,7 @@ namespace PluralKit.Core
}
}
private async Task ExecuteSqlFile(string resourceName, IDbConnection conn, IDbTransaction tx = null)
private async Task ExecuteSqlFile(string resourceName, IPKConnection conn, IDbTransaction tx = null)
{
await using var stream = typeof(Schemas).Assembly.GetManifestResourceStream(resourceName);
if (stream == null) throw new ArgumentException($"Invalid resource name '{resourceName}'");
@@ -76,10 +76,10 @@ namespace PluralKit.Core
// If the above creates new enum/composite types, we must tell Npgsql to reload the internal type caches
// This will propagate to every other connection as well, since it marks the global type mapper collection dirty.
// TODO: find a way to get around the cast to our internal tracker wrapper... this could break if that ever changes
((PerformanceTrackingConnection) conn)._impl.ReloadTypes();
conn.ReloadTypes();
}
private async Task<int> GetCurrentDatabaseVersion(IDbConnection conn)
private async Task<int> GetCurrentDatabaseVersion(IPKConnection conn)
{
// First, check if the "info" table exists (it may not, if this is a *really* old database)
var hasInfoTable =

View File

@@ -0,0 +1,17 @@
using System;
using System.Data;
using System.Data.Common;
using System.Threading;
using System.Threading.Tasks;
namespace PluralKit.Core
{
public interface IPKCommand: IDbCommand, IAsyncDisposable
{
public Task PrepareAsync(CancellationToken ct = default);
public Task<int> ExecuteNonQueryAsync(CancellationToken ct = default);
public Task<object> ExecuteScalarAsync(CancellationToken ct = default);
public Task<DbDataReader> ExecuteReaderAsync(CancellationToken ct = default);
public Task<DbDataReader> ExecuteReaderAsync(CommandBehavior behavior, CancellationToken ct = default);
}
}

View File

@@ -0,0 +1,34 @@
using System;
using System.Data;
using System.Data.Common;
using System.Threading;
using System.Threading.Tasks;
using Npgsql;
namespace PluralKit.Core
{
public interface IPKConnection: IDbConnection, IAsyncDisposable
{
public Guid ConnectionId { get; }
public Task OpenAsync(CancellationToken cancellationToken = default);
public Task CloseAsync();
public Task ChangeDatabaseAsync(string databaseName, CancellationToken ct = default);
public ValueTask<DbTransaction> BeginTransactionAsync(CancellationToken ct = default) => BeginTransactionAsync(IsolationLevel.Unspecified, ct);
public ValueTask<DbTransaction> BeginTransactionAsync(IsolationLevel level, CancellationToken ct = default);
public NpgsqlBinaryImporter BeginBinaryImport(string copyFromCommand);
public NpgsqlBinaryExporter BeginBinaryExport(string copyToCommand);
public void ReloadTypes();
[Obsolete] new void Open();
[Obsolete] new void Close();
[Obsolete] new IDbTransaction BeginTransaction();
[Obsolete] new IDbTransaction BeginTransaction(IsolationLevel il);
}
}

View File

@@ -0,0 +1,117 @@
#nullable enable
using System;
using System.Data;
using System.Data.Common;
using System.Threading;
using System.Threading.Tasks;
using App.Metrics;
using NodaTime;
using Npgsql;
using Serilog;
namespace PluralKit.Core
{
public class PKCommand: DbCommand, IPKCommand
{
private readonly NpgsqlCommand _inner;
private readonly PKConnection _ourConnection;
private readonly ILogger _logger;
private readonly IMetrics _metrics;
public PKCommand(NpgsqlCommand inner, PKConnection ourConnection, ILogger logger, IMetrics metrics)
{
_inner = inner;
_ourConnection = ourConnection;
_logger = logger.ForContext<PKCommand>();
_metrics = metrics;
}
public override int ExecuteNonQuery() => throw SyncError(nameof(ExecuteNonQuery));
public override object ExecuteScalar() => throw SyncError(nameof(ExecuteScalar));
protected override DbDataReader ExecuteDbDataReader(CommandBehavior behavior) => throw SyncError(nameof(ExecuteDbDataReader));
public override Task<int> ExecuteNonQueryAsync(CancellationToken ct) => LogQuery(_inner.ExecuteNonQueryAsync(ct));
public override Task<object> ExecuteScalarAsync(CancellationToken ct) => LogQuery(_inner.ExecuteScalarAsync(ct));
protected override async Task<DbDataReader> ExecuteDbDataReaderAsync(CommandBehavior behavior, CancellationToken ct) => await LogQuery(_inner.ExecuteReaderAsync(behavior, ct));
public override void Prepare() => _inner.Prepare();
public override void Cancel() => _inner.Cancel();
protected override DbParameter CreateDbParameter() => _inner.CreateParameter();
public override string CommandText
{
get => _inner.CommandText;
set => _inner.CommandText = value;
}
public override int CommandTimeout
{
get => _inner.CommandTimeout;
set => _inner.CommandTimeout = value;
}
public override CommandType CommandType
{
get => _inner.CommandType;
set => _inner.CommandType = value;
}
public override UpdateRowSource UpdatedRowSource
{
get => _inner.UpdatedRowSource;
set => _inner.UpdatedRowSource = value;
}
protected override DbParameterCollection DbParameterCollection => _inner.Parameters;
protected override DbTransaction? DbTransaction
{
get => _inner.Transaction;
set => _inner.Transaction = (NpgsqlTransaction?) value;
}
public override bool DesignTimeVisible
{
get => _inner.DesignTimeVisible;
set => _inner.DesignTimeVisible = value;
}
protected override DbConnection? DbConnection
{
get => _inner.Connection;
set =>
_inner.Connection = value switch
{
NpgsqlConnection npg => npg,
PKConnection pk => pk.Inner,
_ => throw new ArgumentException($"Can't convert input type {value?.GetType()} to NpgsqlConnection")
};
}
private async Task<T> LogQuery<T>(Task<T> task)
{
var start = SystemClock.Instance.GetCurrentInstant();
try
{
return await task;
}
finally
{
var end = SystemClock.Instance.GetCurrentInstant();
var elapsed = end - start;
_logger.Verbose("Executed query {Query} in {ElapsedTime} on connection {ConnectionId}", CommandText, elapsed, _ourConnection.ConnectionId);
// One "BCL compatible tick" is 100 nanoseconds
var micros = elapsed.BclCompatibleTicks / 10;
_metrics.Provider.Timer.Instance(CoreMetrics.DatabaseQuery, new MetricTags("query", CommandText))
.Record(micros, TimeUnit.Microseconds, CommandText);
}
}
private static Exception SyncError(string caller) => throw new Exception($"Executed synchronous IPKCommand function {caller}!");
}
}

View File

@@ -0,0 +1,107 @@
#nullable enable
using System;
using System.Data;
using System.Data.Common;
using System.Threading;
using System.Threading.Tasks;
using App.Metrics;
using NodaTime;
using Npgsql;
using Serilog;
namespace PluralKit.Core
{
public class PKConnection: DbConnection, IPKConnection
{
public NpgsqlConnection Inner { get; }
public Guid ConnectionId { get; }
private readonly DbConnectionCountHolder _countHolder;
private readonly ILogger _logger;
private readonly IMetrics _metrics;
private bool _hasOpened;
private bool _hasClosed;
private Instant _openTime;
public PKConnection(NpgsqlConnection inner, DbConnectionCountHolder countHolder, ILogger logger, IMetrics metrics)
{
Inner = inner;
ConnectionId = Guid.NewGuid();
_countHolder = countHolder;
_logger = logger.ForContext<PKConnection>();
_metrics = metrics;
}
public override Task OpenAsync(CancellationToken ct)
{
if (_hasOpened) return Inner.OpenAsync(ct);
_countHolder.Increment();
_hasOpened = true;
_openTime = SystemClock.Instance.GetCurrentInstant();
_logger.Verbose("Opened database connection {ConnectionId}, new connection count {ConnectionCount}", ConnectionId, _countHolder.ConnectionCount);
return Inner.OpenAsync(ct);
}
public override Task CloseAsync() => Inner.CloseAsync();
protected override DbCommand CreateDbCommand() => new PKCommand(Inner.CreateCommand(), this, _logger, _metrics);
public void ReloadTypes() => Inner.ReloadTypes();
public NpgsqlBinaryImporter BeginBinaryImport(string copyFromCommand) => Inner.BeginBinaryImport(copyFromCommand);
public NpgsqlBinaryExporter BeginBinaryExport(string copyToCommand) => Inner.BeginBinaryExport(copyToCommand);
public override void ChangeDatabase(string databaseName) => Inner.ChangeDatabase(databaseName);
public override Task ChangeDatabaseAsync(string databaseName, CancellationToken ct = default) => Inner.ChangeDatabaseAsync(databaseName, ct);
protected override DbTransaction BeginDbTransaction(IsolationLevel isolationLevel) => throw SyncError(nameof(BeginDbTransaction));
protected override async ValueTask<DbTransaction> BeginDbTransactionAsync(IsolationLevel level, CancellationToken ct) => await Inner.BeginTransactionAsync(level, ct);
public override void Open() => throw SyncError(nameof(Open));
public override void Close() => throw SyncError(nameof(Close));
IDbTransaction IPKConnection.BeginTransaction() => throw SyncError(nameof(BeginTransaction));
IDbTransaction IPKConnection.BeginTransaction(IsolationLevel level) => throw SyncError(nameof(BeginTransaction));
public override string ConnectionString
{
get => Inner.ConnectionString;
set => Inner.ConnectionString = value;
}
public override string? Database => Inner.Database;
public override ConnectionState State => Inner.State;
public override string DataSource => Inner.DataSource;
public override string ServerVersion => Inner.ServerVersion;
protected override void Dispose(bool disposing)
{
Inner.Dispose();
if (_hasClosed) return;
LogClose();
}
public override ValueTask DisposeAsync()
{
if (_hasClosed) return Inner.DisposeAsync();
LogClose();
return Inner.DisposeAsync();
}
private void LogClose()
{
_countHolder.Decrement();
_hasClosed = true;
var duration = SystemClock.Instance.GetCurrentInstant() - _openTime;
_logger.Verbose("Closed database connection {ConnectionId} (open for {ConnectionDuration}), new connection count {ConnectionCount}", ConnectionId, duration, _countHolder.ConnectionCount);
}
private static Exception SyncError(string caller) => throw new Exception($"Executed synchronous IPKConnection function {caller}!");
}
}