2020-06-11 19:11:50 +00:00
#nullable enable
using System ;
using System.Collections.Generic ;
using System.Collections.Immutable ;
2020-06-12 22:43:48 +00:00
using System.Data ;
2020-06-11 19:11:50 +00:00
using System.Linq ;
using System.Threading.Tasks ;
using Dapper ;
using NodaTime ;
using NpgsqlTypes ;
namespace PluralKit.Core
{
public class BulkImporter : IAsyncDisposable
{
2020-06-14 19:37:04 +00:00
private readonly SystemId _systemId ;
2020-06-13 16:31:20 +00:00
private readonly IPKConnection _conn ;
2020-06-13 16:49:05 +00:00
private readonly IPKTransaction _tx ;
2020-06-14 19:37:04 +00:00
private readonly Dictionary < string , MemberId > _knownMembers = new Dictionary < string , MemberId > ( ) ;
2020-06-11 19:11:50 +00:00
private readonly Dictionary < string , PKMember > _existingMembersByHid = new Dictionary < string , PKMember > ( ) ;
private readonly Dictionary < string , PKMember > _existingMembersByName = new Dictionary < string , PKMember > ( ) ;
2020-06-14 19:37:04 +00:00
private BulkImporter ( SystemId systemId , IPKConnection conn , IPKTransaction tx )
2020-06-11 19:11:50 +00:00
{
_systemId = systemId ;
_conn = conn ;
_tx = tx ;
}
2020-06-13 16:31:20 +00:00
public static async Task < BulkImporter > Begin ( PKSystem system , IPKConnection conn )
2020-06-11 19:11:50 +00:00
{
var tx = await conn . BeginTransactionAsync ( ) ;
var importer = new BulkImporter ( system . Id , conn , tx ) ;
await importer . Begin ( ) ;
return importer ;
}
public async Task Begin ( )
{
// Fetch all members in the system and log their names and hids
var members = await _conn . QueryAsync < PKMember > ( "select id, hid, name from members where system = @System" ,
new { System = _systemId } ) ;
foreach ( var m in members )
{
_existingMembersByHid [ m . Hid ] = m ;
_existingMembersByName [ m . Name ] = m ;
}
}
/// <summary>
/// Checks whether trying to add a member with the given hid and name would result in creating a new member (as opposed to just updating one).
/// </summary>
public bool IsNewMember ( string hid , string name ) = > FindExistingMemberInSystem ( hid , name ) = = null ;
/// <summary>
/// Imports a member into the database
/// </summary>
/// <remarks>If an existing member exists in this system that matches this member in either HID or name, it'll overlay the member information on top of this instead.</remarks>
/// <param name="identifier">An opaque identifier string that refers to this member regardless of source. Is used when importing switches. Value is irrelevant, but should be consistent with the same member later.</param>
/// <param name="member">A member struct containing the data to apply to this member. Null fields will be ignored.</param>
/// <returns>The inserted member object, which may or may not share an ID or HID with the input member.</returns>
public async Task < PKMember > AddMember ( string identifier , PKMember member )
{
// See if we can find a member that matches this one
// if not, roll a new hid and we'll insert one with that
// (we can't trust the hid given in the member, it might let us overwrite another system's members)
var existingMember = FindExistingMemberInSystem ( member . Hid , member . Name ) ;
2020-06-12 22:43:48 +00:00
string newHid = existingMember ? . Hid ? ? await _conn . QuerySingleAsync < string > ( "find_free_member_hid" , commandType : CommandType . StoredProcedure ) ;
2020-06-11 19:11:50 +00:00
// Upsert member data and return the ID
QueryBuilder qb = QueryBuilder . Upsert ( "members" , "hid" )
. Constant ( "hid" , "@Hid" )
. Constant ( "system" , "@System" )
. Variable ( "name" , "@Name" )
. Variable ( "keep_proxy" , "@KeepProxy" ) ;
if ( member . DisplayName ! = null ) qb . Variable ( "display_name" , "@DisplayName" ) ;
if ( member . Description ! = null ) qb . Variable ( "description" , "@Description" ) ;
if ( member . Color ! = null ) qb . Variable ( "color" , "@Color" ) ;
if ( member . AvatarUrl ! = null ) qb . Variable ( "avatar_url" , "@AvatarUrl" ) ;
if ( member . ProxyTags ! = null ) qb . Variable ( "proxy_tags" , "@ProxyTags" ) ;
if ( member . Birthday ! = null ) qb . Variable ( "birthday" , "@Birthday" ) ;
var newMember = await _conn . QueryFirstAsync < PKMember > ( qb . Build ( "returning *" ) ,
new
{
Hid = newHid ,
System = _systemId ,
member . Name ,
member . DisplayName ,
member . Description ,
member . Color ,
member . AvatarUrl ,
member . KeepProxy ,
member . ProxyTags ,
member . Birthday
} ) ;
// Log this member ID by the given identifier
_knownMembers [ identifier ] = newMember . Id ;
return newMember ;
}
private PKMember ? FindExistingMemberInSystem ( string hid , string name )
{
if ( _existingMembersByHid . TryGetValue ( hid , out var byHid ) ) return byHid ;
if ( _existingMembersByName . TryGetValue ( name , out var byName ) ) return byName ;
return null ;
}
/// <summary>
/// Register switches in bulk.
/// </summary>
/// <remarks>This function assumes there are no duplicate switches (ie. switches with the same timestamp).</remarks>
public async Task AddSwitches ( IReadOnlyCollection < SwitchInfo > switches )
{
// Ensure we're aware of all the members we're trying to import from
if ( ! switches . All ( sw = > sw . MemberIdentifiers . All ( m = > _knownMembers . ContainsKey ( m ) ) ) )
throw new ArgumentException ( "One or more switch members haven't been added using this importer" ) ;
// Fetch the existing switches in the database so we can avoid duplicates
var existingSwitches = ( await _conn . QueryAsync < PKSwitch > ( "select * from switches where system = @System" , new { System = _systemId } ) ) . ToList ( ) ;
var existingTimestamps = existingSwitches . Select ( sw = > sw . Timestamp ) . ToImmutableHashSet ( ) ;
2020-06-14 19:37:04 +00:00
var lastSwitchId = existingSwitches . Count ! = 0 ? existingSwitches . Select ( sw = > sw . Id ) . Max ( ) : ( SwitchId ? ) null ;
2020-06-11 19:11:50 +00:00
// Import switch definitions
var importedSwitches = new Dictionary < Instant , SwitchInfo > ( ) ;
await using ( var importer = _conn . BeginBinaryImport ( "copy switches (system, timestamp) from stdin (format binary)" ) )
{
foreach ( var sw in switches )
{
// Don't import duplicate switches
if ( existingTimestamps . Contains ( sw . Timestamp ) ) continue ;
// Otherwise, write to importer
await importer . StartRowAsync ( ) ;
await importer . WriteAsync ( _systemId , NpgsqlDbType . Integer ) ;
await importer . WriteAsync ( sw . Timestamp , NpgsqlDbType . Timestamp ) ;
// Note that we've imported a switch with this timestamp
importedSwitches [ sw . Timestamp ] = sw ;
}
// Commit the import
await importer . CompleteAsync ( ) ;
}
// Now, fetch all the switches we just added (so, now we get their IDs too)
// IDs are sequential, so any ID in this system, with a switch ID > the last max, will be one we just added
var justAddedSwitches = await _conn . QueryAsync < PKSwitch > (
"select * from switches where system = @System and id > @LastSwitchId" ,
2020-06-14 19:37:04 +00:00
new { System = _systemId , LastSwitchId = lastSwitchId ? . Value ? ? - 1 } ) ;
2020-06-11 19:11:50 +00:00
// Lastly, import the switch members
await using ( var importer = _conn . BeginBinaryImport ( "copy switch_members (switch, member) from stdin (format binary)" ) )
{
foreach ( var justAddedSwitch in justAddedSwitches )
{
if ( ! importedSwitches . TryGetValue ( justAddedSwitch . Timestamp , out var switchInfo ) )
throw new Exception ( $"Found 'just-added' switch (by ID) with timestamp {justAddedSwitch.Timestamp}, but this did not correspond to a timestamp we just added a switch entry of! :/" ) ;
// We still assume timestamps are unique and non-duplicate, so:
var members = switchInfo . MemberIdentifiers ;
foreach ( var memberIdentifier in members )
{
if ( ! _knownMembers . TryGetValue ( memberIdentifier , out var memberId ) )
throw new Exception ( $"Attempted to import switch with member identifier {memberIdentifier} but could not find an entry in the id map for this! :/" ) ;
await importer . StartRowAsync ( ) ;
await importer . WriteAsync ( justAddedSwitch . Id , NpgsqlDbType . Integer ) ;
await importer . WriteAsync ( memberId , NpgsqlDbType . Integer ) ;
}
}
await importer . CompleteAsync ( ) ;
}
}
public struct SwitchInfo
{
public Instant Timestamp ;
/// <summary>
/// An ordered list of "member identifiers" matching with the identifier parameter passed to <see cref="BulkImporter.AddMember"/>.
/// </summary>
public IReadOnlyList < string > MemberIdentifiers ;
}
public async ValueTask DisposeAsync ( ) = >
await _tx . CommitAsync ( ) ;
}
}