diff --git a/PluralKit.Core/DataFiles.cs b/PluralKit.Core/DataFiles.cs index 8e15c646..b6e56945 100644 --- a/PluralKit.Core/DataFiles.cs +++ b/PluralKit.Core/DataFiles.cs @@ -179,9 +179,10 @@ namespace PluralKit.Bot mappedSwitches.Add(mapped); } // Import switches - await _switches.RegisterSwitches(system, mappedSwitches); + if (mappedSwitches.Any()) + await _switches.BulkImportSwitches(system, mappedSwitches); - _logger.Information("Imported system {System}", system.Id); + _logger.Information("Imported system {System}", system.Hid); return result; } } diff --git a/PluralKit.Core/Stores.cs b/PluralKit.Core/Stores.cs index 22a978f5..aaef353b 100644 --- a/PluralKit.Core/Stores.cs +++ b/PluralKit.Core/Stores.cs @@ -5,7 +5,7 @@ using System.Threading.Tasks; using App.Metrics.Logging; using Dapper; using NodaTime; - +using Npgsql; using PluralKit.Core; using Serilog; @@ -345,6 +345,77 @@ namespace PluralKit { } } + public async Task BulkImportSwitches(PKSystem system, ICollection>> switches) + { + // Read existing switches to enforce unique timestamps + var priorSwitches = await GetSwitches(system); + var lastSwitchId = priorSwitches.Any() + ? priorSwitches.Max(x => x.Id) + : 0; + + using (var conn = (PerformanceTrackingConnection) await _conn.Obtain()) + { + using (var tx = conn.BeginTransaction()) + { + // Import switches in bulk + using (var importer = conn.BeginBinaryImport("COPY switches (system, timestamp) FROM STDIN (FORMAT BINARY)")) + { + foreach (var sw in switches) + { + // If there's already a switch at this time, move on + if (priorSwitches.Any(x => x.Timestamp.Equals(sw.Item1))) + continue; + + // Otherwise, add it to the importer + importer.StartRow(); + importer.Write(system.Id, NpgsqlTypes.NpgsqlDbType.Integer); + importer.Write(sw.Item1, NpgsqlTypes.NpgsqlDbType.Timestamp); + } + importer.Complete(); // Commits the copy operation so dispose won't roll it back + } + + // Get all switches that were created above and don't have members for ID lookup + var switchesWithoutMembers = + await conn.QueryAsync(@" + SELECT switches.* + FROM switches + LEFT JOIN switch_members + ON switch_members.switch = switches.id + WHERE switches.id > @LastSwitchId + AND switches.system = @System + AND switch_members.id IS NULL", new { LastSwitchId = lastSwitchId, System = system.Id }); + + // Import switch_members in bulk + using (var importer = conn.BeginBinaryImport("COPY switch_members (switch, member) FROM STDIN (FORMAT BINARY)")) + { + // Iterate over the switches we created above and set their members + foreach (var pkSwitch in switchesWithoutMembers) + { + // If this isn't in our import set, move on + var sw = switches.FirstOrDefault(x => x.Item1.Equals(pkSwitch.Timestamp)); + if (sw == null) + continue; + + // Loop through associated members to add each to the switch + foreach (var m in sw.Item2) + { + // Skip switch-outs - these don't have switch_members + if (m == null) + continue; + importer.StartRow(); + importer.Write(pkSwitch.Id, NpgsqlTypes.NpgsqlDbType.Integer); + importer.Write(m.Id, NpgsqlTypes.NpgsqlDbType.Integer); + } + } + importer.Complete(); // Commits the copy operation so dispose won't roll it back + } + tx.Commit(); + } + } + + _logger.Information("Completed bulk import of switches for system {0}", system.Hid); + } + public async Task RegisterSwitches(PKSystem system, ICollection>> switches) { // Use a transaction here since we're doing multiple executed commands in one diff --git a/PluralKit.Core/Utils.cs b/PluralKit.Core/Utils.cs index b71c5b6a..31cd3f1b 100644 --- a/PluralKit.Core/Utils.cs +++ b/PluralKit.Core/Utils.cs @@ -488,6 +488,11 @@ namespace PluralKit _impl.Open(); } + public NpgsqlBinaryImporter BeginBinaryImport(string copyFromCommand) + { + return _impl.BeginBinaryImport(copyFromCommand); + } + public string ConnectionString { get => _impl.ConnectionString;