2021-08-26 01:43:31 +00:00
using System.Collections.Immutable ;
using Dapper ;
using Newtonsoft.Json.Linq ;
using NodaTime ;
using NpgsqlTypes ;
2021-11-27 02:10:56 +00:00
namespace PluralKit.Core ;
public partial class BulkImporter
2021-08-26 01:43:31 +00:00
{
2021-11-27 02:10:56 +00:00
private async Task < ImportResultNew > ImportPluralKit ( JObject importFile )
2021-08-26 01:43:31 +00:00
{
2021-11-27 02:10:56 +00:00
var patch = SystemPatch . FromJSON ( importFile ) ;
patch . AssertIsValid ( ) ;
if ( patch . Errors . Count > 0 )
2021-08-26 01:43:31 +00:00
{
2021-11-27 02:10:56 +00:00
var err = patch . Errors [ 0 ] ;
if ( err is FieldTooLongError )
throw new ImportException ( $"Field {err.Key} in export file is too long "
+ $"({(err as FieldTooLongError).ActualLength} > {(err as FieldTooLongError).MaxLength})." ) ;
if ( err . Text ! = null )
throw new ImportException ( err . Text ) ;
throw new ImportException ( $"Field {err.Key} in export file is invalid." ) ;
}
2021-08-26 01:43:31 +00:00
2021-11-27 02:10:56 +00:00
await _repo . UpdateSystem ( _system . Id , patch , _conn ) ;
2021-08-26 01:43:31 +00:00
2021-11-30 22:04:42 +00:00
var configPatch = new SystemConfigPatch ( ) ;
if ( importFile . ContainsKey ( "config" ) )
configPatch = SystemConfigPatch . FromJson ( importFile . Value < JObject > ( "config" ) ) ;
if ( importFile . ContainsKey ( "timezone" ) )
configPatch . UiTz = importFile . Value < string > ( "timezone" ) ;
configPatch . AssertIsValid ( ) ;
if ( configPatch . Errors . Count > 0 )
throw new ImportException ( $"Field config.{patch.Errors[0].Key} in export file is invalid." ) ;
await _repo . UpdateSystemConfig ( _system . Id , configPatch , _conn ) ;
2021-11-27 02:10:56 +00:00
var members = importFile . Value < JArray > ( "members" ) ;
var groups = importFile . Value < JArray > ( "groups" ) ;
var switches = importFile . Value < JArray > ( "switches" ) ;
2021-08-26 01:43:31 +00:00
2021-11-27 02:10:56 +00:00
var newMembers = members . Count ( m = >
{
var ( found , _ ) = TryGetExistingMember ( m . Value < string > ( "id" ) , m . Value < string > ( "name" ) ) ;
return found = = null ;
} ) ;
await AssertMemberLimitNotReached ( newMembers ) ;
2021-08-27 15:03:47 +00:00
2021-11-27 02:10:56 +00:00
if ( groups ! = null )
{
var newGroups = groups . Count ( g = >
2021-08-27 15:03:47 +00:00
{
2021-11-27 02:10:56 +00:00
var ( found , _ ) = TryGetExistingGroup ( g . Value < string > ( "id" ) , g . Value < string > ( "name" ) ) ;
2021-08-26 01:43:31 +00:00
return found = = null ;
} ) ;
2021-11-27 02:10:56 +00:00
await AssertGroupLimitNotReached ( newGroups ) ;
}
2021-09-22 01:42:41 +00:00
2021-11-27 02:10:56 +00:00
foreach ( JObject member in members )
await ImportMember ( member ) ;
2021-08-26 01:43:31 +00:00
2021-11-27 02:10:56 +00:00
if ( groups ! = null )
foreach ( JObject group in groups )
await ImportGroup ( group ) ;
2021-08-26 01:43:31 +00:00
2021-11-27 02:10:56 +00:00
if ( switches . Any ( sw = >
sw . Value < JArray > ( "members" ) . Any ( m = > ! _knownMemberIdentifiers . ContainsKey ( ( string ) m ) ) ) )
throw new ImportException ( "One or more switches include members that haven't been imported." ) ;
2021-09-22 01:42:41 +00:00
2021-11-27 02:10:56 +00:00
await ImportSwitches ( switches ) ;
2021-08-26 01:43:31 +00:00
2021-11-27 02:10:56 +00:00
return _result ;
}
2021-08-26 01:43:31 +00:00
2021-11-27 02:10:56 +00:00
private async Task ImportMember ( JObject member )
{
var id = member . Value < string > ( "id" ) ;
var name = member . Value < string > ( "name" ) ;
var ( found , isHidExisting ) = TryGetExistingMember ( id , name ) ;
var isNewMember = found = = null ;
var referenceName = isHidExisting ? id : name ;
2021-08-26 01:43:31 +00:00
2021-11-27 02:10:56 +00:00
if ( isNewMember )
_result . Added + + ;
else
_result . Modified + + ;
_logger . Debug (
"Importing member with identifier {FileId} to system {System} (is creating new member? {IsCreatingNewMember})" ,
referenceName , _system . Id , isNewMember
) ;
var patch = MemberPatch . FromJSON ( member ) ;
patch . AssertIsValid ( ) ;
if ( patch . Errors . Count > 0 )
2021-08-26 01:43:31 +00:00
{
2021-11-27 02:10:56 +00:00
var err = patch . Errors [ 0 ] ;
if ( err is FieldTooLongError )
throw new ImportException ( $"Field {err.Key} in member {name} is too long "
+ $"({(err as FieldTooLongError).ActualLength} > {(err as FieldTooLongError).MaxLength})." ) ;
if ( err . Text ! = null )
throw new ImportException ( $"member {name}: {err.Text}" ) ;
throw new ImportException ( $"Field {err.Key} in member {name} is invalid." ) ;
}
2021-08-26 01:43:31 +00:00
2021-11-27 02:10:56 +00:00
var memberId = found ;
2021-08-26 01:43:31 +00:00
2021-11-27 02:10:56 +00:00
if ( isNewMember )
{
2021-11-27 03:02:58 +00:00
patch . MessageCount = member . Value < int > ( "message_count" ) ;
2021-11-27 02:10:56 +00:00
var newMember = await _repo . CreateMember ( _system . Id , patch . Name . Value , _conn ) ;
memberId = newMember . Id ;
}
2021-08-26 01:43:31 +00:00
2021-11-27 02:10:56 +00:00
_knownMemberIdentifiers [ id ] = memberId . Value ;
2021-08-26 01:43:31 +00:00
2021-11-27 02:10:56 +00:00
await _repo . UpdateMember ( memberId . Value , patch , _conn ) ;
}
2021-10-13 12:37:34 +00:00
2021-11-27 02:10:56 +00:00
private async Task ImportGroup ( JObject group )
{
var id = group . Value < string > ( "id" ) ;
var name = group . Value < string > ( "name" ) ;
2021-08-26 01:43:31 +00:00
2021-11-27 02:10:56 +00:00
var ( found , isHidExisting ) = TryGetExistingGroup ( id , name ) ;
var isNewGroup = found = = null ;
var referenceName = isHidExisting ? id : name ;
2021-08-26 01:43:31 +00:00
2021-11-27 02:10:56 +00:00
_logger . Debug (
"Importing group with identifier {FileId} to system {System} (is creating new group? {IsCreatingNewGroup})" ,
referenceName , _system . Id , isNewGroup
) ;
2021-08-26 01:43:31 +00:00
2021-11-27 02:10:56 +00:00
var patch = GroupPatch . FromJson ( group ) ;
2021-08-26 01:43:31 +00:00
2021-11-27 02:10:56 +00:00
patch . AssertIsValid ( ) ;
if ( patch . Errors . Count > 0 )
{
var err = patch . Errors [ 0 ] ;
if ( err is FieldTooLongError )
throw new ImportException ( $"Field {err.Key} in group {name} is too long "
+ $"({(err as FieldTooLongError).ActualLength} > {(err as FieldTooLongError).MaxLength})." ) ;
if ( err . Text ! = null )
throw new ImportException ( $"group {name}: {err.Text}" ) ;
throw new ImportException ( $"Field {err.Key} in group {name} is invalid." ) ;
2021-08-26 01:43:31 +00:00
}
2021-11-27 02:10:56 +00:00
var groupId = found ;
if ( isNewGroup )
2021-09-22 01:42:41 +00:00
{
2021-11-27 02:10:56 +00:00
var newGroup = await _repo . CreateGroup ( _system . Id , patch . Name . Value , _conn ) ;
groupId = newGroup . Id ;
}
2021-09-22 01:42:41 +00:00
2021-11-27 02:10:56 +00:00
_knownGroupIdentifiers [ id ] = groupId . Value ;
2021-09-22 01:42:41 +00:00
2021-11-27 02:10:56 +00:00
await _repo . UpdateGroup ( groupId . Value , patch , _conn ) ;
2021-09-22 01:42:41 +00:00
2021-11-27 02:10:56 +00:00
var groupMembers = group . Value < JArray > ( "members" ) ;
var currentGroupMembers = ( await _conn . QueryAsync < MemberId > (
"select member_id from group_members where group_id = @groupId" ,
new { groupId = groupId . Value }
) ) . ToList ( ) ;
2021-10-13 12:37:34 +00:00
2021-11-27 02:10:56 +00:00
await using ( var importer =
_conn . BeginBinaryImport ( "copy group_members (group_id, member_id) from stdin (format binary)" ) )
{
foreach ( var memberIdentifier in groupMembers )
2021-09-22 01:42:41 +00:00
{
2021-11-27 02:10:56 +00:00
if ( ! _knownMemberIdentifiers . TryGetValue ( memberIdentifier . ToString ( ) , out var memberId ) )
throw new Exception (
$"Attempted to import group member with member identifier {memberIdentifier} but could not find a recently imported member with this id!" ) ;
2021-09-22 01:42:41 +00:00
2021-11-27 02:10:56 +00:00
if ( currentGroupMembers . Contains ( memberId ) )
continue ;
2021-09-22 01:42:41 +00:00
2021-11-27 02:10:56 +00:00
await importer . StartRowAsync ( ) ;
await importer . WriteAsync ( groupId . Value . Value , NpgsqlDbType . Integer ) ;
await importer . WriteAsync ( memberId . Value , NpgsqlDbType . Integer ) ;
2021-09-22 01:42:41 +00:00
}
2021-11-27 02:10:56 +00:00
await importer . CompleteAsync ( ) ;
}
}
2021-09-22 01:42:41 +00:00
2021-11-27 02:10:56 +00:00
private async Task ImportSwitches ( JArray switches )
{
var existingSwitches =
( await _conn . QueryAsync < PKSwitch > ( "select * from switches where system = @System" ,
new { System = _system . Id } ) ) . ToList ( ) ;
var existingTimestamps = existingSwitches . Select ( sw = > sw . Timestamp ) . ToImmutableHashSet ( ) ;
var lastSwitchId = existingSwitches . Count ! = 0
? existingSwitches . Select ( sw = > sw . Id ) . Max ( )
: ( SwitchId ? ) null ;
if ( switches . Count > 10000 )
throw new ImportException ( "Too many switches present in import file." ) ;
// Import switch definitions
var importedSwitches = new Dictionary < Instant , JArray > ( ) ;
await using ( var importer =
_conn . BeginBinaryImport ( "copy switches (system, timestamp) from stdin (format binary)" ) )
{
foreach ( var sw in switches )
2021-09-22 01:42:41 +00:00
{
2021-11-27 02:10:56 +00:00
var timestampString = sw . Value < string > ( "timestamp" ) ;
var timestamp = DateTimeFormats . TimestampExportFormat . Parse ( timestampString ) ;
if ( ! timestamp . Success )
throw new ImportException ( $"Switch timestamp {timestampString} is not an valid timestamp." ) ;
// Don't import duplicate switches
if ( existingTimestamps . Contains ( timestamp . Value ) ) continue ;
// Otherwise, write to importer
await importer . StartRowAsync ( ) ;
await importer . WriteAsync ( _system . Id . Value , NpgsqlDbType . Integer ) ;
await importer . WriteAsync ( timestamp . Value , NpgsqlDbType . Timestamp ) ;
var members = sw . Value < JArray > ( "members" ) ;
if ( members . Count > Limits . MaxSwitchMemberCount )
throw new ImportException (
$"Switch with timestamp {timestampString} contains too many members ({members.Count} > 100)." ) ;
// Note that we've imported a switch with this timestamp
importedSwitches [ timestamp . Value ] = sw . Value < JArray > ( "members" ) ;
2021-09-22 01:42:41 +00:00
}
2021-11-27 02:10:56 +00:00
// Commit the import
await importer . CompleteAsync ( ) ;
2021-09-22 01:42:41 +00:00
}
2021-08-26 01:43:31 +00:00
2021-11-27 02:10:56 +00:00
// Now, fetch all the switches we just added (so, now we get their IDs too)
// IDs are sequential, so any ID in this system, with a switch ID > the last max, will be one we just added
var justAddedSwitches = await _conn . QueryAsync < PKSwitch > (
"select * from switches where system = @System and id > @LastSwitchId" ,
new { System = _system . Id , LastSwitchId = lastSwitchId ? . Value ? ? - 1 } ) ;
2021-08-26 01:43:31 +00:00
2021-11-27 02:10:56 +00:00
// Lastly, import the switch members
await using ( var importer =
_conn . BeginBinaryImport ( "copy switch_members (switch, member) from stdin (format binary)" ) )
{
foreach ( var justAddedSwitch in justAddedSwitches )
2021-08-26 01:43:31 +00:00
{
2021-11-27 02:10:56 +00:00
if ( ! importedSwitches . TryGetValue ( justAddedSwitch . Timestamp , out var switchMembers ) )
throw new Exception (
$"Found 'just-added' switch (by ID) with timestamp {justAddedSwitch.Timestamp}, but this did not correspond to a timestamp we just added a switch entry of! :/" ) ;
2021-08-27 15:03:47 +00:00
2021-11-27 02:10:56 +00:00
// We still assume timestamps are unique and non-duplicate, so:
foreach ( var memberIdentifier in switchMembers )
{
if ( ! _knownMemberIdentifiers . TryGetValue ( ( string ) memberIdentifier , out var memberId ) )
throw new Exception (
$"Attempted to import switch with member identifier {memberIdentifier} but could not find an entry in the id map for this! :/" ) ;
2021-08-27 15:03:47 +00:00
2021-08-26 01:43:31 +00:00
await importer . StartRowAsync ( ) ;
2021-11-27 02:10:56 +00:00
await importer . WriteAsync ( justAddedSwitch . Id . Value , NpgsqlDbType . Integer ) ;
await importer . WriteAsync ( memberId . Value , NpgsqlDbType . Integer ) ;
2021-08-26 01:43:31 +00:00
}
}
2021-08-27 15:03:47 +00:00
2021-11-27 02:10:56 +00:00
await importer . CompleteAsync ( ) ;
2021-08-26 01:43:31 +00:00
}
}
}