move database utils to Database/Utils, create DatabaseMigrator
This commit is contained in:
@@ -18,21 +18,21 @@ namespace PluralKit.Core
|
||||
{
|
||||
internal class Database: IDatabase
|
||||
{
|
||||
private const string RootPath = "PluralKit.Core.Database"; // "resource path" root for SQL files
|
||||
private const int TargetSchemaVersion = 14;
|
||||
|
||||
private readonly CoreConfig _config;
|
||||
private readonly ILogger _logger;
|
||||
private readonly IMetrics _metrics;
|
||||
private readonly DbConnectionCountHolder _countHolder;
|
||||
private readonly DatabaseMigrator _migrator;
|
||||
private readonly string _connectionString;
|
||||
|
||||
public Database(CoreConfig config, DbConnectionCountHolder countHolder, ILogger logger,
|
||||
IMetrics metrics)
|
||||
IMetrics metrics, DatabaseMigrator migrator)
|
||||
{
|
||||
_config = config;
|
||||
_countHolder = countHolder;
|
||||
_metrics = metrics;
|
||||
_migrator = migrator;
|
||||
_logger = logger.ForContext<Database>();
|
||||
|
||||
_connectionString = new NpgsqlConnectionStringBuilder(_config.Database)
|
||||
@@ -92,65 +92,8 @@ namespace PluralKit.Core
|
||||
|
||||
public async Task ApplyMigrations()
|
||||
{
|
||||
// Run everything in a transaction
|
||||
await using var conn = await Obtain();
|
||||
await using var tx = await conn.BeginTransactionAsync();
|
||||
|
||||
// Before applying migrations, clean out views/functions to prevent type errors
|
||||
await ExecuteSqlFile($"{RootPath}.clean.sql", conn, tx);
|
||||
|
||||
// Apply all migrations between the current database version and the target version
|
||||
await ApplyMigrations(conn, tx);
|
||||
|
||||
// Now, reapply views/functions (we deleted them above, no need to worry about conflicts)
|
||||
await ExecuteSqlFile($"{RootPath}.Views.views.sql", conn, tx);
|
||||
await ExecuteSqlFile($"{RootPath}.Functions.functions.sql", conn, tx);
|
||||
|
||||
// Finally, commit tx
|
||||
await tx.CommitAsync();
|
||||
}
|
||||
|
||||
private async Task ApplyMigrations(IPKConnection conn, IDbTransaction tx)
|
||||
{
|
||||
var currentVersion = await GetCurrentDatabaseVersion(conn);
|
||||
_logger.Information("Current schema version: {CurrentVersion}", currentVersion);
|
||||
for (var migration = currentVersion + 1; migration <= TargetSchemaVersion; migration++)
|
||||
{
|
||||
_logger.Information("Applying schema migration {MigrationId}", migration);
|
||||
await ExecuteSqlFile($"{RootPath}.Migrations.{migration}.sql", conn, tx);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ExecuteSqlFile(string resourceName, IPKConnection conn, IDbTransaction tx = null)
|
||||
{
|
||||
await using var stream = typeof(Database).Assembly.GetManifestResourceStream(resourceName);
|
||||
if (stream == null) throw new ArgumentException($"Invalid resource name '{resourceName}'");
|
||||
|
||||
using var reader = new StreamReader(stream);
|
||||
var query = await reader.ReadToEndAsync();
|
||||
|
||||
await conn.ExecuteAsync(query, transaction: tx);
|
||||
|
||||
// If the above creates new enum/composite types, we must tell Npgsql to reload the internal type caches
|
||||
// This will propagate to every other connection as well, since it marks the global type mapper collection dirty.
|
||||
((PKConnection) conn).ReloadTypes();
|
||||
}
|
||||
|
||||
private async Task<int> GetCurrentDatabaseVersion(IPKConnection conn)
|
||||
{
|
||||
// First, check if the "info" table exists (it may not, if this is a *really* old database)
|
||||
var hasInfoTable =
|
||||
await conn.QuerySingleOrDefaultAsync<int>(
|
||||
"select count(*) from information_schema.tables where table_name = 'info'") == 1;
|
||||
|
||||
// If we have the table, read the schema version
|
||||
if (hasInfoTable)
|
||||
return await conn.QuerySingleOrDefaultAsync<int>("select schema_version from info");
|
||||
|
||||
// If not, we return version "-1"
|
||||
// This means migration 0 will get executed, getting us into a consistent state
|
||||
// Then, migration 1 gets executed, which creates the info table and sets version to 1
|
||||
return -1;
|
||||
using var conn = await Obtain();
|
||||
await _migrator.ApplyMigrations(conn);
|
||||
}
|
||||
|
||||
private class PassthroughTypeHandler<T>: SqlMapper.TypeHandler<T>
|
||||
|
18
PluralKit.Core/Database/Utils/ConnectionUtils.cs
Normal file
18
PluralKit.Core/Database/Utils/ConnectionUtils.cs
Normal file
@@ -0,0 +1,18 @@
|
||||
using System.Collections.Generic;
|
||||
using System.Data.Common;
|
||||
|
||||
using Dapper;
|
||||
|
||||
namespace PluralKit.Core {
|
||||
public static class ConnectionUtils
|
||||
{
|
||||
public static async IAsyncEnumerable<T> QueryStreamAsync<T>(this IPKConnection conn, string sql, object param)
|
||||
{
|
||||
await using var reader = (DbDataReader) await conn.ExecuteReaderAsync(sql, param);
|
||||
var parser = reader.GetRowParser<T>();
|
||||
|
||||
while (await reader.ReadAsync())
|
||||
yield return parser(reader);
|
||||
}
|
||||
}
|
||||
}
|
85
PluralKit.Core/Database/Utils/DatabaseMigrator.cs
Normal file
85
PluralKit.Core/Database/Utils/DatabaseMigrator.cs
Normal file
@@ -0,0 +1,85 @@
|
||||
using System;
|
||||
using System.Data;
|
||||
using System.IO;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
using Dapper;
|
||||
|
||||
using Serilog;
|
||||
|
||||
namespace PluralKit.Core
|
||||
{
|
||||
internal class DatabaseMigrator
|
||||
{
|
||||
private const string RootPath = "PluralKit.Core.Database"; // "resource path" root for SQL files
|
||||
private const int TargetSchemaVersion = 14;
|
||||
private readonly ILogger _logger;
|
||||
|
||||
public DatabaseMigrator(ILogger logger)
|
||||
{
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
public async Task ApplyMigrations(IPKConnection conn)
|
||||
{
|
||||
// Run everything in a transaction
|
||||
await using var tx = await conn.BeginTransactionAsync();
|
||||
|
||||
// Before applying migrations, clean out views/functions to prevent type errors
|
||||
await ExecuteSqlFile($"{RootPath}.clean.sql", conn, tx);
|
||||
|
||||
// Apply all migrations between the current database version and the target version
|
||||
await ApplyMigrations(conn, tx);
|
||||
|
||||
// Now, reapply views/functions (we deleted them above, no need to worry about conflicts)
|
||||
await ExecuteSqlFile($"{RootPath}.Views.views.sql", conn, tx);
|
||||
await ExecuteSqlFile($"{RootPath}.Functions.functions.sql", conn, tx);
|
||||
|
||||
// Finally, commit tx
|
||||
await tx.CommitAsync();
|
||||
}
|
||||
|
||||
private async Task ApplyMigrations(IPKConnection conn, IDbTransaction tx)
|
||||
{
|
||||
var currentVersion = await GetCurrentDatabaseVersion(conn);
|
||||
_logger.Information("Current schema version: {CurrentVersion}", currentVersion);
|
||||
for (var migration = currentVersion + 1; migration <= TargetSchemaVersion; migration++)
|
||||
{
|
||||
_logger.Information("Applying schema migration {MigrationId}", migration);
|
||||
await ExecuteSqlFile($"{RootPath}.Migrations.{migration}.sql", conn, tx);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ExecuteSqlFile(string resourceName, IPKConnection conn, IDbTransaction tx = null)
|
||||
{
|
||||
await using var stream = typeof(Database).Assembly.GetManifestResourceStream(resourceName);
|
||||
if (stream == null) throw new ArgumentException($"Invalid resource name '{resourceName}'");
|
||||
|
||||
using var reader = new StreamReader(stream);
|
||||
var query = await reader.ReadToEndAsync();
|
||||
|
||||
await conn.ExecuteAsync(query, transaction: tx);
|
||||
|
||||
// If the above creates new enum/composite types, we must tell Npgsql to reload the internal type caches
|
||||
// This will propagate to every other connection as well, since it marks the global type mapper collection dirty.
|
||||
((PKConnection) conn).ReloadTypes();
|
||||
}
|
||||
|
||||
private async Task<int> GetCurrentDatabaseVersion(IPKConnection conn)
|
||||
{
|
||||
// First, check if the "info" table exists (it may not, if this is a *really* old database)
|
||||
var hasInfoTable =
|
||||
await conn.QuerySingleOrDefaultAsync<int>(
|
||||
"select count(*) from information_schema.tables where table_name = 'info'") == 1;
|
||||
|
||||
// If we have the table, read the schema version
|
||||
if (hasInfoTable)
|
||||
return await conn.QuerySingleOrDefaultAsync<int>("select schema_version from info");
|
||||
|
||||
// If not, we return version "-1"
|
||||
// This means migration 0 will get executed, getting us into a consistent state
|
||||
// Then, migration 1 gets executed, which creates the info table and sets version to 1
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
}
|
20
PluralKit.Core/Database/Utils/DbConnectionCountHolder.cs
Normal file
20
PluralKit.Core/Database/Utils/DbConnectionCountHolder.cs
Normal file
@@ -0,0 +1,20 @@
|
||||
using System.Threading;
|
||||
|
||||
namespace PluralKit.Core
|
||||
{
|
||||
public class DbConnectionCountHolder
|
||||
{
|
||||
private int _connectionCount;
|
||||
public int ConnectionCount => _connectionCount;
|
||||
|
||||
public void Increment()
|
||||
{
|
||||
Interlocked.Increment(ref _connectionCount);
|
||||
}
|
||||
|
||||
public void Decrement()
|
||||
{
|
||||
Interlocked.Decrement(ref _connectionCount);
|
||||
}
|
||||
}
|
||||
}
|
88
PluralKit.Core/Database/Utils/QueryBuilder.cs
Normal file
88
PluralKit.Core/Database/Utils/QueryBuilder.cs
Normal file
@@ -0,0 +1,88 @@
|
||||
#nullable enable
|
||||
using System;
|
||||
using System.Text;
|
||||
|
||||
namespace PluralKit.Core
|
||||
{
|
||||
public class QueryBuilder
|
||||
{
|
||||
private readonly string? _conflictField;
|
||||
private readonly string? _condition;
|
||||
private readonly StringBuilder _insertFragment = new StringBuilder();
|
||||
private readonly StringBuilder _valuesFragment = new StringBuilder();
|
||||
private readonly StringBuilder _updateFragment = new StringBuilder();
|
||||
private bool _firstInsert = true;
|
||||
private bool _firstUpdate = true;
|
||||
public QueryType Type { get; }
|
||||
public string Table { get; }
|
||||
|
||||
private QueryBuilder(QueryType type, string table, string? conflictField, string? condition)
|
||||
{
|
||||
Type = type;
|
||||
Table = table;
|
||||
_conflictField = conflictField;
|
||||
_condition = condition;
|
||||
}
|
||||
|
||||
public static QueryBuilder Insert(string table) => new QueryBuilder(QueryType.Insert, table, null, null);
|
||||
public static QueryBuilder Update(string table, string condition) => new QueryBuilder(QueryType.Update, table, null, condition);
|
||||
public static QueryBuilder Upsert(string table, string conflictField) => new QueryBuilder(QueryType.Upsert, table, conflictField, null);
|
||||
|
||||
public QueryBuilder Constant(string fieldName, string paramName)
|
||||
{
|
||||
if (_firstInsert) _firstInsert = false;
|
||||
else
|
||||
{
|
||||
_insertFragment.Append(", ");
|
||||
_valuesFragment.Append(", ");
|
||||
}
|
||||
|
||||
_insertFragment.Append(fieldName);
|
||||
_valuesFragment.Append(paramName);
|
||||
return this;
|
||||
}
|
||||
|
||||
public QueryBuilder Variable(string fieldName, string paramName)
|
||||
{
|
||||
Constant(fieldName, paramName);
|
||||
|
||||
if (_firstUpdate) _firstUpdate = false;
|
||||
else _updateFragment.Append(", ");
|
||||
|
||||
_updateFragment.Append(fieldName);
|
||||
_updateFragment.Append(" = ");
|
||||
_updateFragment.Append(paramName);
|
||||
return this;
|
||||
}
|
||||
|
||||
public string Build(string? suffix = null)
|
||||
{
|
||||
if (_firstInsert)
|
||||
throw new ArgumentException("No fields have been added to the query.");
|
||||
|
||||
StringBuilder query = new StringBuilder(Type switch
|
||||
{
|
||||
QueryType.Insert => $"insert into {Table} ({_insertFragment}) values ({_valuesFragment})",
|
||||
QueryType.Upsert => $"insert into {Table} ({_insertFragment}) values ({_valuesFragment}) on conflict ({_conflictField}) do update set {_updateFragment}",
|
||||
QueryType.Update => $"update {Table} set {_updateFragment}",
|
||||
_ => throw new ArgumentOutOfRangeException($"Unknown query type {Type}")
|
||||
});
|
||||
|
||||
if (Type == QueryType.Update && _condition != null)
|
||||
query.Append($" where {_condition}");
|
||||
|
||||
if (!string.IsNullOrEmpty(suffix))
|
||||
query.Append($" {suffix}");
|
||||
query.Append(";");
|
||||
|
||||
return query.ToString();
|
||||
}
|
||||
|
||||
public enum QueryType
|
||||
{
|
||||
Insert,
|
||||
Update,
|
||||
Upsert
|
||||
}
|
||||
}
|
||||
}
|
45
PluralKit.Core/Database/Utils/UpdateQueryBuilder.cs
Normal file
45
PluralKit.Core/Database/Utils/UpdateQueryBuilder.cs
Normal file
@@ -0,0 +1,45 @@
|
||||
using System.Text;
|
||||
|
||||
using Dapper;
|
||||
|
||||
namespace PluralKit.Core
|
||||
{
|
||||
public class UpdateQueryBuilder
|
||||
{
|
||||
private readonly QueryBuilder _qb;
|
||||
private readonly DynamicParameters _params = new DynamicParameters();
|
||||
|
||||
private UpdateQueryBuilder(QueryBuilder qb)
|
||||
{
|
||||
_qb = qb;
|
||||
}
|
||||
|
||||
public static UpdateQueryBuilder Insert(string table) => new UpdateQueryBuilder(QueryBuilder.Insert(table));
|
||||
public static UpdateQueryBuilder Update(string table, string condition) => new UpdateQueryBuilder(QueryBuilder.Update(table, condition));
|
||||
public static UpdateQueryBuilder Upsert(string table, string conflictField) => new UpdateQueryBuilder(QueryBuilder.Upsert(table, conflictField));
|
||||
|
||||
public UpdateQueryBuilder WithConstant<T>(string name, T value)
|
||||
{
|
||||
_params.Add(name, value);
|
||||
_qb.Constant(name, $"@{name}");
|
||||
return this;
|
||||
}
|
||||
|
||||
public UpdateQueryBuilder With<T>(string columnName, T value)
|
||||
{
|
||||
_params.Add(columnName, value);
|
||||
_qb.Variable(columnName, $"@{columnName}");
|
||||
return this;
|
||||
}
|
||||
|
||||
public UpdateQueryBuilder With<T>(string columnName, Partial<T> partialValue)
|
||||
{
|
||||
return partialValue.IsPresent ? With(columnName, partialValue.Value) : this;
|
||||
}
|
||||
|
||||
public (string Query, DynamicParameters Parameters) Build(string suffix = "")
|
||||
{
|
||||
return (_qb.Build(suffix), _params);
|
||||
}
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user