Bulk import switches for pk;import

We're now using binary import for switches and switch_members when importing a system profile, rather than importing them one switch at a time.

This adds a pass-through method to the PerformanceTrackingConnection that can be used for other bulk import applications.
This commit is contained in:
Noko 2019-10-20 14:38:43 -05:00
parent 3d21adeec9
commit 406f005b4f
3 changed files with 80 additions and 3 deletions

View File

@ -179,9 +179,10 @@ namespace PluralKit.Bot
mappedSwitches.Add(mapped); mappedSwitches.Add(mapped);
} }
// Import switches // Import switches
await _switches.RegisterSwitches(system, mappedSwitches); if (mappedSwitches.Any())
await _switches.BulkImportSwitches(system, mappedSwitches);
_logger.Information("Imported system {System}", system.Id); _logger.Information("Imported system {System}", system.Hid);
return result; return result;
} }
} }

View File

@ -5,7 +5,7 @@ using System.Threading.Tasks;
using App.Metrics.Logging; using App.Metrics.Logging;
using Dapper; using Dapper;
using NodaTime; using NodaTime;
using Npgsql;
using PluralKit.Core; using PluralKit.Core;
using Serilog; using Serilog;
@ -345,6 +345,77 @@ namespace PluralKit {
} }
} }
public async Task BulkImportSwitches(PKSystem system, ICollection<Tuple<Instant, ICollection<PKMember>>> switches)
{
// Read existing switches to enforce unique timestamps
var priorSwitches = await GetSwitches(system);
var lastSwitchId = priorSwitches.Any()
? priorSwitches.Max(x => x.Id)
: 0;
using (var conn = (PerformanceTrackingConnection) await _conn.Obtain())
{
using (var tx = conn.BeginTransaction())
{
// Import switches in bulk
using (var importer = conn.BeginBinaryImport("COPY switches (system, timestamp) FROM STDIN (FORMAT BINARY)"))
{
foreach (var sw in switches)
{
// If there's already a switch at this time, move on
if (priorSwitches.Any(x => x.Timestamp.Equals(sw.Item1)))
continue;
// Otherwise, add it to the importer
importer.StartRow();
importer.Write(system.Id, NpgsqlTypes.NpgsqlDbType.Integer);
importer.Write(sw.Item1, NpgsqlTypes.NpgsqlDbType.Timestamp);
}
importer.Complete(); // Commits the copy operation so dispose won't roll it back
}
// Get all switches that were created above and don't have members for ID lookup
var switchesWithoutMembers =
await conn.QueryAsync<PKSwitch>(@"
SELECT switches.*
FROM switches
LEFT JOIN switch_members
ON switch_members.switch = switches.id
WHERE switches.id > @LastSwitchId
AND switches.system = @System
AND switch_members.id IS NULL", new { LastSwitchId = lastSwitchId, System = system.Id });
// Import switch_members in bulk
using (var importer = conn.BeginBinaryImport("COPY switch_members (switch, member) FROM STDIN (FORMAT BINARY)"))
{
// Iterate over the switches we created above and set their members
foreach (var pkSwitch in switchesWithoutMembers)
{
// If this isn't in our import set, move on
var sw = switches.FirstOrDefault(x => x.Item1.Equals(pkSwitch.Timestamp));
if (sw == null)
continue;
// Loop through associated members to add each to the switch
foreach (var m in sw.Item2)
{
// Skip switch-outs - these don't have switch_members
if (m == null)
continue;
importer.StartRow();
importer.Write(pkSwitch.Id, NpgsqlTypes.NpgsqlDbType.Integer);
importer.Write(m.Id, NpgsqlTypes.NpgsqlDbType.Integer);
}
}
importer.Complete(); // Commits the copy operation so dispose won't roll it back
}
tx.Commit();
}
}
_logger.Information("Completed bulk import of switches for system {0}", system.Hid);
}
public async Task RegisterSwitches(PKSystem system, ICollection<Tuple<Instant, ICollection<PKMember>>> switches) public async Task RegisterSwitches(PKSystem system, ICollection<Tuple<Instant, ICollection<PKMember>>> switches)
{ {
// Use a transaction here since we're doing multiple executed commands in one // Use a transaction here since we're doing multiple executed commands in one

View File

@ -488,6 +488,11 @@ namespace PluralKit
_impl.Open(); _impl.Open();
} }
public NpgsqlBinaryImporter BeginBinaryImport(string copyFromCommand)
{
return _impl.BeginBinaryImport(copyFromCommand);
}
public string ConnectionString public string ConnectionString
{ {
get => _impl.ConnectionString; get => _impl.ConnectionString;