Move schema migration stuff to Database
This commit is contained in:
parent
1d1b7b8716
commit
f1b28b7fb6
@ -35,7 +35,7 @@ namespace PluralKit.Bot
|
|||||||
|
|
||||||
// "Connect to the database" (ie. set off database migrations and ensure state)
|
// "Connect to the database" (ie. set off database migrations and ensure state)
|
||||||
logger.Information("Connecting to database");
|
logger.Information("Connecting to database");
|
||||||
await services.Resolve<Schemas>().InitializeDatabase();
|
await services.Resolve<IDatabase>().ApplyMigrations();
|
||||||
|
|
||||||
// Init the bot instance itself, register handlers and such to the client before beginning to connect
|
// Init the bot instance itself, register handlers and such to the client before beginning to connect
|
||||||
logger.Information("Initializing bot");
|
logger.Information("Initializing bot");
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
using System;
|
using System;
|
||||||
using System.Data;
|
using System.Data;
|
||||||
|
using System.IO;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
using App.Metrics;
|
using App.Metrics;
|
||||||
@ -16,6 +17,9 @@ namespace PluralKit.Core
|
|||||||
{
|
{
|
||||||
internal class Database: IDatabase
|
internal class Database: IDatabase
|
||||||
{
|
{
|
||||||
|
private const string RootPath = "PluralKit.Core.Database"; // "resource path" root for SQL files
|
||||||
|
private const int TargetSchemaVersion = 7;
|
||||||
|
|
||||||
private readonly CoreConfig _config;
|
private readonly CoreConfig _config;
|
||||||
private readonly ILogger _logger;
|
private readonly ILogger _logger;
|
||||||
private readonly IMetrics _metrics;
|
private readonly IMetrics _metrics;
|
||||||
@ -52,7 +56,7 @@ namespace PluralKit.Core
|
|||||||
NpgsqlConnection.GlobalTypeMapper.MapComposite<ProxyTag>("proxy_tag");
|
NpgsqlConnection.GlobalTypeMapper.MapComposite<ProxyTag>("proxy_tag");
|
||||||
NpgsqlConnection.GlobalTypeMapper.MapEnum<PrivacyLevel>("privacy_level");
|
NpgsqlConnection.GlobalTypeMapper.MapEnum<PrivacyLevel>("privacy_level");
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task<IPKConnection> Obtain()
|
public async Task<IPKConnection> Obtain()
|
||||||
{
|
{
|
||||||
// Mark the request (for a handle, I guess) in the metrics
|
// Mark the request (for a handle, I guess) in the metrics
|
||||||
@ -64,6 +68,69 @@ namespace PluralKit.Core
|
|||||||
await conn.OpenAsync();
|
await conn.OpenAsync();
|
||||||
return conn;
|
return conn;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public async Task ApplyMigrations()
|
||||||
|
{
|
||||||
|
// Run everything in a transaction
|
||||||
|
await using var conn = await Obtain();
|
||||||
|
await using var tx = await conn.BeginTransactionAsync();
|
||||||
|
|
||||||
|
// Before applying migrations, clean out views/functions to prevent type errors
|
||||||
|
await ExecuteSqlFile($"{RootPath}.clean.sql", conn, tx);
|
||||||
|
|
||||||
|
// Apply all migrations between the current database version and the target version
|
||||||
|
await ApplyMigrations(conn, tx);
|
||||||
|
|
||||||
|
// Now, reapply views/functions (we deleted them above, no need to worry about conflicts)
|
||||||
|
await ExecuteSqlFile($"{RootPath}.views.sql", conn, tx);
|
||||||
|
await ExecuteSqlFile($"{RootPath}.Functions.functions.sql", conn, tx);
|
||||||
|
|
||||||
|
// Finally, commit tx
|
||||||
|
await tx.CommitAsync();
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task ApplyMigrations(IPKConnection conn, IDbTransaction tx)
|
||||||
|
{
|
||||||
|
var currentVersion = await GetCurrentDatabaseVersion(conn);
|
||||||
|
_logger.Information("Current schema version: {CurrentVersion}", currentVersion);
|
||||||
|
for (var migration = currentVersion + 1; migration <= TargetSchemaVersion; migration++)
|
||||||
|
{
|
||||||
|
_logger.Information("Applying schema migration {MigrationId}", migration);
|
||||||
|
await ExecuteSqlFile($"{RootPath}.Migrations.{migration}.sql", conn, tx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task ExecuteSqlFile(string resourceName, IPKConnection conn, IDbTransaction tx = null)
|
||||||
|
{
|
||||||
|
await using var stream = typeof(Database).Assembly.GetManifestResourceStream(resourceName);
|
||||||
|
if (stream == null) throw new ArgumentException($"Invalid resource name '{resourceName}'");
|
||||||
|
|
||||||
|
using var reader = new StreamReader(stream);
|
||||||
|
var query = await reader.ReadToEndAsync();
|
||||||
|
|
||||||
|
await conn.ExecuteAsync(query, transaction: tx);
|
||||||
|
|
||||||
|
// If the above creates new enum/composite types, we must tell Npgsql to reload the internal type caches
|
||||||
|
// This will propagate to every other connection as well, since it marks the global type mapper collection dirty.
|
||||||
|
((PKConnection) conn).ReloadTypes();
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task<int> GetCurrentDatabaseVersion(IPKConnection conn)
|
||||||
|
{
|
||||||
|
// First, check if the "info" table exists (it may not, if this is a *really* old database)
|
||||||
|
var hasInfoTable =
|
||||||
|
await conn.QuerySingleOrDefaultAsync<int>(
|
||||||
|
"select count(*) from information_schema.tables where table_name = 'info'") == 1;
|
||||||
|
|
||||||
|
// If we have the table, read the schema version
|
||||||
|
if (hasInfoTable)
|
||||||
|
return await conn.QuerySingleOrDefaultAsync<int>("select schema_version from info");
|
||||||
|
|
||||||
|
// If not, we return version "-1"
|
||||||
|
// This means migration 0 will get executed, getting us into a consistent state
|
||||||
|
// Then, migration 1 gets executed, which creates the info table and sets version to 1
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
private class PassthroughTypeHandler<T>: SqlMapper.TypeHandler<T>
|
private class PassthroughTypeHandler<T>: SqlMapper.TypeHandler<T>
|
||||||
{
|
{
|
||||||
|
@ -4,6 +4,7 @@ namespace PluralKit.Core
|
|||||||
{
|
{
|
||||||
public interface IDatabase
|
public interface IDatabase
|
||||||
{
|
{
|
||||||
|
Task ApplyMigrations();
|
||||||
Task<IPKConnection> Obtain();
|
Task<IPKConnection> Obtain();
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -1,92 +0,0 @@
|
|||||||
using System;
|
|
||||||
using System.Data;
|
|
||||||
using System.IO;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
|
|
||||||
using Dapper;
|
|
||||||
|
|
||||||
using Npgsql;
|
|
||||||
|
|
||||||
using Serilog;
|
|
||||||
|
|
||||||
namespace PluralKit.Core
|
|
||||||
{
|
|
||||||
public class Schemas
|
|
||||||
{
|
|
||||||
private const string RootPath = "PluralKit.Core.Database"; // "resource path" root for SQL files
|
|
||||||
private const int TargetSchemaVersion = 7;
|
|
||||||
|
|
||||||
private IDatabase _conn;
|
|
||||||
private ILogger _logger;
|
|
||||||
|
|
||||||
public Schemas(IDatabase conn, ILogger logger)
|
|
||||||
{
|
|
||||||
_conn = conn;
|
|
||||||
_logger = logger.ForContext<Schemas>();
|
|
||||||
}
|
|
||||||
|
|
||||||
public async Task InitializeDatabase()
|
|
||||||
{
|
|
||||||
// Run everything in a transaction
|
|
||||||
await using var conn = await _conn.Obtain();
|
|
||||||
await using var tx = await conn.BeginTransactionAsync();
|
|
||||||
|
|
||||||
// Before applying migrations, clean out views/functions to prevent type errors
|
|
||||||
await ExecuteSqlFile($"{RootPath}.clean.sql", conn, tx);
|
|
||||||
|
|
||||||
// Apply all migrations between the current database version and the target version
|
|
||||||
await ApplyMigrations(conn, tx);
|
|
||||||
|
|
||||||
// Now, reapply views/functions (we deleted them above, no need to worry about conflicts)
|
|
||||||
await ExecuteSqlFile($"{RootPath}.views.sql", conn, tx);
|
|
||||||
await ExecuteSqlFile($"{RootPath}.Functions.functions.sql", conn, tx);
|
|
||||||
|
|
||||||
// Finally, commit tx
|
|
||||||
await tx.CommitAsync();
|
|
||||||
}
|
|
||||||
|
|
||||||
private async Task ApplyMigrations(IPKConnection conn, IDbTransaction tx)
|
|
||||||
{
|
|
||||||
var currentVersion = await GetCurrentDatabaseVersion(conn);
|
|
||||||
_logger.Information("Current schema version: {CurrentVersion}", currentVersion);
|
|
||||||
for (var migration = currentVersion + 1; migration <= TargetSchemaVersion; migration++)
|
|
||||||
{
|
|
||||||
_logger.Information("Applying schema migration {MigrationId}", migration);
|
|
||||||
await ExecuteSqlFile($"{RootPath}.Migrations.{migration}.sql", conn, tx);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private async Task ExecuteSqlFile(string resourceName, IPKConnection conn, IDbTransaction tx = null)
|
|
||||||
{
|
|
||||||
await using var stream = typeof(Schemas).Assembly.GetManifestResourceStream(resourceName);
|
|
||||||
if (stream == null) throw new ArgumentException($"Invalid resource name '{resourceName}'");
|
|
||||||
|
|
||||||
using var reader = new StreamReader(stream);
|
|
||||||
var query = await reader.ReadToEndAsync();
|
|
||||||
|
|
||||||
await conn.ExecuteAsync(query, transaction: tx);
|
|
||||||
|
|
||||||
// If the above creates new enum/composite types, we must tell Npgsql to reload the internal type caches
|
|
||||||
// This will propagate to every other connection as well, since it marks the global type mapper collection dirty.
|
|
||||||
// TODO: find a way to get around the cast to our internal tracker wrapper... this could break if that ever changes
|
|
||||||
conn.ReloadTypes();
|
|
||||||
}
|
|
||||||
|
|
||||||
private async Task<int> GetCurrentDatabaseVersion(IPKConnection conn)
|
|
||||||
{
|
|
||||||
// First, check if the "info" table exists (it may not, if this is a *really* old database)
|
|
||||||
var hasInfoTable =
|
|
||||||
await conn.QuerySingleOrDefaultAsync<int>(
|
|
||||||
"select count(*) from information_schema.tables where table_name = 'info'") == 1;
|
|
||||||
|
|
||||||
// If we have the table, read the schema version
|
|
||||||
if (hasInfoTable)
|
|
||||||
return await conn.QuerySingleOrDefaultAsync<int>("select schema_version from info");
|
|
||||||
|
|
||||||
// If not, we return version "-1"
|
|
||||||
// This means migration 0 will get executed, getting us into a consistent state
|
|
||||||
// Then, migration 1 gets executed, which creates the info table and sets version to 1
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -22,9 +22,7 @@ namespace PluralKit.Core
|
|||||||
|
|
||||||
public NpgsqlBinaryImporter BeginBinaryImport(string copyFromCommand);
|
public NpgsqlBinaryImporter BeginBinaryImport(string copyFromCommand);
|
||||||
public NpgsqlBinaryExporter BeginBinaryExport(string copyToCommand);
|
public NpgsqlBinaryExporter BeginBinaryExport(string copyToCommand);
|
||||||
|
|
||||||
public void ReloadTypes();
|
|
||||||
|
|
||||||
[Obsolete] new void Open();
|
[Obsolete] new void Open();
|
||||||
[Obsolete] new void Close();
|
[Obsolete] new void Close();
|
||||||
|
|
||||||
|
@ -23,9 +23,8 @@ namespace PluralKit.Core
|
|||||||
protected override void Load(ContainerBuilder builder)
|
protected override void Load(ContainerBuilder builder)
|
||||||
{
|
{
|
||||||
builder.RegisterType<DbConnectionCountHolder>().SingleInstance();
|
builder.RegisterType<DbConnectionCountHolder>().SingleInstance();
|
||||||
builder.RegisterType<Database>().AsSelf().SingleInstance();
|
builder.RegisterType<Database>().As<IDatabase>().SingleInstance();
|
||||||
builder.RegisterType<PostgresDataStore>().AsSelf().As<IDataStore>();
|
builder.RegisterType<PostgresDataStore>().AsSelf().As<IDataStore>();
|
||||||
builder.RegisterType<Schemas>().AsSelf();
|
|
||||||
|
|
||||||
builder.Populate(new ServiceCollection().AddMemoryCache());
|
builder.Populate(new ServiceCollection().AddMemoryCache());
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user