2020-06-13 17:15:29 +00:00
using System.Collections.Generic ;
2019-08-14 05:16:48 +00:00
using System.Linq ;
using System.Threading.Tasks ;
2019-12-21 20:42:06 +00:00
2019-08-14 05:16:48 +00:00
using Dapper ;
2020-02-12 14:16:19 +00:00
using NodaTime ;
2020-02-01 13:40:57 +00:00
2019-08-14 05:16:48 +00:00
using Serilog ;
2020-02-12 14:16:19 +00:00
namespace PluralKit.Core {
2019-10-26 17:45:30 +00:00
public class PostgresDataStore : IDataStore {
2020-06-13 17:36:43 +00:00
private IDatabase _conn ;
2019-08-14 05:16:48 +00:00
private ILogger _logger ;
2020-06-13 17:36:43 +00:00
public PostgresDataStore ( IDatabase conn , ILogger logger )
2019-08-14 05:16:48 +00:00
{
2019-10-26 17:45:30 +00:00
_conn = conn ;
_logger = logger ;
2019-08-14 05:16:48 +00:00
}
2020-06-13 14:03:57 +00:00
2019-10-26 17:45:30 +00:00
public async Task < PKSystem > CreateSystem ( string systemName = null ) {
2019-08-14 05:16:48 +00:00
PKSystem system ;
using ( var conn = await _conn . Obtain ( ) )
2020-06-12 22:43:48 +00:00
system = await conn . QuerySingleAsync < PKSystem > ( "insert into systems (hid, name) values (find_free_system_hid(), @Name) returning *" , new { Name = systemName } ) ;
2019-08-14 05:16:48 +00:00
_logger . Information ( "Created system {System}" , system . Id ) ;
2020-02-01 13:40:57 +00:00
// New system has no accounts, therefore nothing gets cached, therefore no need to invalidate caches right here
2019-08-14 05:16:48 +00:00
return system ;
}
2019-10-26 17:45:30 +00:00
public async Task AddAccount ( PKSystem system , ulong accountId ) {
2019-08-14 05:16:48 +00:00
// We have "on conflict do nothing" since linking an account when it's already linked to the same system is idempotent
// This is used in import/export, although the pk;link command checks for this case beforehand
using ( var conn = await _conn . Obtain ( ) )
await conn . ExecuteAsync ( "insert into accounts (uid, system) values (@Id, @SystemId) on conflict do nothing" , new { Id = accountId , SystemId = system . Id } ) ;
_logger . Information ( "Linked system {System} to account {Account}" , system . Id , accountId ) ;
}
2019-10-26 17:45:30 +00:00
public async Task RemoveAccount ( PKSystem system , ulong accountId ) {
2019-08-14 05:16:48 +00:00
using ( var conn = await _conn . Obtain ( ) )
await conn . ExecuteAsync ( "delete from accounts where uid = @Id and system = @SystemId" , new { Id = accountId , SystemId = system . Id } ) ;
_logger . Information ( "Unlinked system {System} from account {Account}" , system . Id , accountId ) ;
}
2019-10-26 17:45:30 +00:00
public async Task < PKSystem > GetSystemByAccount ( ulong accountId ) {
2019-08-14 05:16:48 +00:00
using ( var conn = await _conn . Obtain ( ) )
return await conn . QuerySingleOrDefaultAsync < PKSystem > ( "select systems.* from systems, accounts where accounts.system = systems.id and accounts.uid = @Id" , new { Id = accountId } ) ;
}
2019-10-26 17:45:30 +00:00
public async Task < PKSystem > GetSystemByHid ( string hid ) {
2019-08-14 05:16:48 +00:00
using ( var conn = await _conn . Obtain ( ) )
return await conn . QuerySingleOrDefaultAsync < PKSystem > ( "select * from systems where systems.hid = @Hid" , new { Hid = hid . ToLower ( ) } ) ;
}
2019-10-26 17:45:30 +00:00
public async Task < IEnumerable < ulong > > GetSystemAccounts ( PKSystem system )
2019-08-14 05:16:48 +00:00
{
using ( var conn = await _conn . Obtain ( ) )
return await conn . QueryAsync < ulong > ( "select uid from accounts where system = @Id" , new { Id = system . Id } ) ;
}
2019-11-02 21:46:51 +00:00
public async Task DeleteAllSwitches ( PKSystem system )
{
using ( var conn = await _conn . Obtain ( ) )
await conn . ExecuteAsync ( "delete from switches where system = @Id" , system ) ;
}
2019-10-26 17:45:30 +00:00
public async Task < PKMember > GetMemberByHid ( string hid ) {
2019-08-14 05:16:48 +00:00
using ( var conn = await _conn . Obtain ( ) )
return await conn . QuerySingleOrDefaultAsync < PKMember > ( "select * from members where hid = @Hid" , new { Hid = hid . ToLower ( ) } ) ;
}
2019-10-26 17:45:30 +00:00
public async Task < PKMember > GetMemberByName ( PKSystem system , string name ) {
2019-08-14 05:16:48 +00:00
// QueryFirst, since members can (in rare cases) share names
using ( var conn = await _conn . Obtain ( ) )
return await conn . QueryFirstOrDefaultAsync < PKMember > ( "select * from members where lower(name) = lower(@Name) and system = @SystemID" , new { Name = name , SystemID = system . Id } ) ;
}
2020-06-22 11:06:14 +00:00
public async Task < PKMember > GetMemberByDisplayName ( PKSystem system , string name ) {
// QueryFirst, since members can (in rare cases) share display names
using ( var conn = await _conn . Obtain ( ) )
return await conn . QueryFirstOrDefaultAsync < PKMember > ( "select * from members where lower(display_name) = lower(@Name) and system = @SystemID" , new { Name = name , SystemID = system . Id } ) ;
}
2020-01-17 23:58:35 +00:00
public IAsyncEnumerable < PKMember > GetSystemMembers ( PKSystem system , bool orderByName )
{
var sql = "select * from members where system = @SystemID" ;
if ( orderByName ) sql + = " order by lower(name) asc" ;
return _conn . QueryStreamAsync < PKMember > ( sql , new { SystemID = system . Id } ) ;
2019-08-14 05:16:48 +00:00
}
2020-06-14 19:37:04 +00:00
public async Task AddMessage ( IPKConnection conn , ulong senderId , ulong guildId , ulong channelId , ulong postedMessageId , ulong triggerMessageId , MemberId proxiedMemberId ) {
2020-06-13 16:31:20 +00:00
// "on conflict do nothing" in the (pretty rare) case of duplicate events coming in from Discord, which would lead to a DB error before
await conn . ExecuteAsync ( "insert into messages(mid, guild, channel, member, sender, original_mid) values(@MessageId, @GuildId, @ChannelId, @MemberId, @SenderId, @OriginalMid) on conflict do nothing" , new {
MessageId = postedMessageId ,
GuildId = guildId ,
ChannelId = channelId ,
MemberId = proxiedMemberId ,
SenderId = senderId ,
OriginalMid = triggerMessageId
} ) ;
2019-08-14 05:16:48 +00:00
2020-06-12 18:29:50 +00:00
_logger . Debug ( "Stored message {Message} in channel {Channel}" , postedMessageId , channelId ) ;
2019-08-14 05:16:48 +00:00
}
2019-10-26 17:45:30 +00:00
public async Task < FullMessage > GetMessage ( ulong id )
2019-08-14 05:16:48 +00:00
{
using ( var conn = await _conn . Obtain ( ) )
2019-10-26 17:45:30 +00:00
return ( await conn . QueryAsync < PKMessage , PKMember , PKSystem , FullMessage > ( "select messages.*, members.*, systems.* from messages, members, systems where (mid = @Id or original_mid = @Id) and messages.member = members.id and systems.id = members.system" , ( msg , member , system ) = > new FullMessage
2019-08-14 05:16:48 +00:00
{
Message = msg ,
System = system ,
Member = member
} , new { Id = id } ) ) . FirstOrDefault ( ) ;
}
2019-10-26 17:45:30 +00:00
public async Task DeleteMessage ( ulong id ) {
2019-08-14 05:16:48 +00:00
using ( var conn = await _conn . Obtain ( ) )
if ( await conn . ExecuteAsync ( "delete from messages where mid = @Id" , new { Id = id } ) > 0 )
_logger . Information ( "Deleted message {Message}" , id ) ;
}
2020-06-11 21:20:46 +00:00
public async Task DeleteMessagesBulk ( IReadOnlyCollection < ulong > ids )
2019-08-14 05:16:48 +00:00
{
using ( var conn = await _conn . Obtain ( ) )
{
// Npgsql doesn't support ulongs in general - we hacked around it for plain ulongs but tbh not worth it for collections of ulong
// Hence we map them to single longs, which *are* supported (this is ok since they're Technically (tm) stored as signed longs in the db anyway)
var foundCount = await conn . ExecuteAsync ( "delete from messages where mid = any(@Ids)" , new { Ids = ids . Select ( id = > ( long ) id ) . ToArray ( ) } ) ;
if ( foundCount > 0 )
_logger . Information ( "Bulk deleted messages {Messages}, {FoundCount} found" , ids , foundCount ) ;
}
}
2020-06-15 23:15:59 +00:00
public async Task AddSwitch ( SystemId system , IEnumerable < PKMember > members )
2019-08-14 05:16:48 +00:00
{
// Use a transaction here since we're doing multiple executed commands in one
2020-06-13 16:31:20 +00:00
await using var conn = await _conn . Obtain ( ) ;
2020-06-13 17:14:42 +00:00
await using var tx = await conn . BeginTransactionAsync ( ) ;
2020-06-13 16:31:20 +00:00
// First, we insert the switch itself
var sw = await conn . QuerySingleAsync < PKSwitch > ( "insert into switches(system) values (@System) returning *" ,
2020-06-15 23:15:59 +00:00
new { System = system } ) ;
2019-08-14 05:16:48 +00:00
2020-06-13 16:31:20 +00:00
// Then we insert each member in the switch in the switch_members table
// TODO: can we parallelize this or send it in bulk somehow?
foreach ( var member in members )
{
await conn . ExecuteAsync (
"insert into switch_members(switch, member) values(@Switch, @Member)" ,
new { Switch = sw . Id , Member = member . Id } ) ;
}
2019-08-14 05:16:48 +00:00
2020-06-13 16:31:20 +00:00
// Finally we commit the tx, since the using block will otherwise rollback it
2020-06-13 17:14:42 +00:00
await tx . CommitAsync ( ) ;
2019-08-14 05:16:48 +00:00
2020-06-15 23:15:59 +00:00
_logger . Information ( "Registered switch {Switch} in system {System} with members {@Members}" , sw . Id , system , members . Select ( m = > m . Id ) ) ;
2019-08-14 05:16:48 +00:00
}
2020-06-15 23:15:59 +00:00
public IAsyncEnumerable < PKSwitch > GetSwitches ( SystemId system )
2019-08-14 05:16:48 +00:00
{
// TODO: refactor the PKSwitch data structure to somehow include a hydrated member list
// (maybe when we get caching in?)
2020-01-17 23:02:17 +00:00
return _conn . QueryStreamAsync < PKSwitch > (
"select * from switches where system = @System order by timestamp desc" ,
2020-06-15 23:15:59 +00:00
new { System = system } ) ;
2019-08-14 05:16:48 +00:00
}
2020-01-17 23:58:35 +00:00
public async Task < int > GetSwitchCount ( PKSystem system )
{
using var conn = await _conn . Obtain ( ) ;
return await conn . QuerySingleAsync < int > ( "select count(*) from switches where system = @Id" , system ) ;
}
public async IAsyncEnumerable < SwitchMembersListEntry > GetSwitchMembersList ( PKSystem system , Instant start , Instant end )
2019-10-05 20:08:27 +00:00
{
// Wrap multiple commands in a single transaction for performance
2020-06-13 16:31:20 +00:00
await using var conn = await _conn . Obtain ( ) ;
await using var tx = await conn . BeginTransactionAsync ( ) ;
2020-01-17 23:58:35 +00:00
// Find the time of the last switch outside the range as it overlaps the range
// If no prior switch exists, the lower bound of the range remains the start time
var lastSwitch = await conn . QuerySingleOrDefaultAsync < Instant > (
@ "SELECT COALESCE(MAX(timestamp), @Start)
2019-10-05 20:08:27 +00:00
FROM switches
WHERE switches . system = @System
AND switches . timestamp < @Start ",
2020-01-17 23:58:35 +00:00
new { System = system . Id , Start = start } ) ;
2019-10-05 20:08:27 +00:00
2020-01-17 23:58:35 +00:00
// Then collect the time and members of all switches that overlap the range
var switchMembersEntries = conn . QueryStreamAsync < SwitchMembersListEntry > (
@ "SELECT switch_members.member, switches.timestamp
2019-10-05 20:08:27 +00:00
FROM switches
2019-10-06 07:03:28 +00:00
LEFT JOIN switch_members
2019-10-05 20:08:27 +00:00
ON switches . id = switch_members . switch
WHERE switches . system = @System
AND (
switches . timestamp > = @Start
OR switches . timestamp = @LastSwitch
)
AND switches . timestamp < @End
ORDER BY switches . timestamp DESC ",
2020-01-17 23:58:35 +00:00
new { System = system . Id , Start = start , End = end , LastSwitch = lastSwitch } ) ;
2019-10-05 20:08:27 +00:00
2020-01-17 23:58:35 +00:00
// Yield each value here
await foreach ( var entry in switchMembersEntries )
yield return entry ;
// Don't really need to worry about the transaction here, we're not doing any *writes*
2019-10-05 20:08:27 +00:00
}
2020-01-17 23:02:17 +00:00
public IAsyncEnumerable < PKMember > GetSwitchMembers ( PKSwitch sw )
2019-08-14 05:16:48 +00:00
{
2020-01-17 23:02:17 +00:00
return _conn . QueryStreamAsync < PKMember > (
"select * from switch_members, members where switch_members.member = members.id and switch_members.switch = @Switch order by switch_members.id" ,
new { Switch = sw . Id } ) ;
2019-08-14 05:16:48 +00:00
}
2020-06-15 23:15:59 +00:00
public async Task < PKSwitch > GetLatestSwitch ( SystemId system ) = >
2020-01-17 23:02:17 +00:00
await GetSwitches ( system ) . FirstOrDefaultAsync ( ) ;
2019-08-14 05:16:48 +00:00
public async Task MoveSwitch ( PKSwitch sw , Instant time )
{
using ( var conn = await _conn . Obtain ( ) )
await conn . ExecuteAsync ( "update switches set timestamp = @Time where id = @Id" ,
new { Time = time , Id = sw . Id } ) ;
_logger . Information ( "Moved switch {Switch} to {Time}" , sw . Id , time ) ;
}
public async Task DeleteSwitch ( PKSwitch sw )
{
using ( var conn = await _conn . Obtain ( ) )
await conn . ExecuteAsync ( "delete from switches where id = @Id" , new { Id = sw . Id } ) ;
_logger . Information ( "Deleted switch {Switch}" ) ;
}
2019-10-26 17:45:30 +00:00
public async Task < IEnumerable < SwitchListEntry > > GetPeriodFronters ( PKSystem system , Instant periodStart , Instant periodEnd )
2019-08-14 05:16:48 +00:00
{
2020-01-17 23:58:35 +00:00
// TODO: IAsyncEnumerable-ify this one
2019-10-05 20:08:27 +00:00
// Returns the timestamps and member IDs of switches overlapping the range, in chronological (newest first) order
2020-01-17 23:58:35 +00:00
var switchMembers = await GetSwitchMembersList ( system , periodStart , periodEnd ) . ToListAsync ( ) ;
2019-08-14 05:16:48 +00:00
// query DB for all members involved in any of the switches above and collect into a dictionary for future use
// this makes sure the return list has the same instances of PKMember throughout, which is important for the dictionary
// key used in GetPerMemberSwitchDuration below
2020-06-14 19:37:04 +00:00
Dictionary < MemberId , PKMember > memberObjects ;
2019-08-14 05:16:48 +00:00
using ( var conn = await _conn . Obtain ( ) )
{
2019-10-05 20:08:27 +00:00
memberObjects = (
await conn . QueryAsync < PKMember > (
"select * from members where id = any(@Switches)" , // lol postgres specific `= any()` syntax
2020-06-24 11:59:08 +00:00
new { Switches = switchMembers . Select ( m = > m . Member . Value ) . Distinct ( ) . ToList ( ) } )
2020-02-12 14:16:19 +00:00
) . ToDictionary ( m = > m . Id ) ;
2019-08-14 05:16:48 +00:00
}
2019-10-05 20:08:27 +00:00
// Initialize entries - still need to loop to determine the TimespanEnd below
var entries =
from item in switchMembers
group item by item . Timestamp into g
select new SwitchListEntry
{
TimespanStart = g . Key ,
2020-06-14 19:37:04 +00:00
Members = g . Where ( x = > x . Member ! = default ( MemberId ) ) . Select ( x = > memberObjects [ x . Member ] ) . ToList ( )
2019-10-05 20:08:27 +00:00
} ;
2019-08-14 05:16:48 +00:00
2019-10-05 20:08:27 +00:00
// Loop through every switch that overlaps the range and add it to the output list
// end time is the *FOLLOWING* switch's timestamp - we cheat by working backwards from the range end, so no dates need to be compared
2019-08-14 05:16:48 +00:00
var endTime = periodEnd ;
2019-10-05 20:08:27 +00:00
var outList = new List < SwitchListEntry > ( ) ;
foreach ( var e in entries )
2019-08-14 05:16:48 +00:00
{
2019-10-05 20:08:27 +00:00
// Override the start time of the switch if it's outside the range (only true for the "out of range" switch we included above)
var switchStartClamped = e . TimespanStart < periodStart
? periodStart
: e . TimespanStart ;
2019-08-14 05:16:48 +00:00
outList . Add ( new SwitchListEntry
{
2019-10-05 20:08:27 +00:00
Members = e . Members ,
2019-08-14 05:16:48 +00:00
TimespanStart = switchStartClamped ,
TimespanEnd = endTime
} ) ;
2019-10-05 20:08:27 +00:00
// next switch's end is this switch's start (we're working backward in time)
endTime = e . TimespanStart ;
2019-08-14 05:16:48 +00:00
}
return outList ;
}
2019-10-26 17:45:30 +00:00
public async Task < FrontBreakdown > GetFrontBreakdown ( PKSystem system , Instant periodStart , Instant periodEnd )
2019-08-14 05:16:48 +00:00
{
var dict = new Dictionary < PKMember , Duration > ( ) ;
var noFronterDuration = Duration . Zero ;
// Sum up all switch durations for each member
// switches with multiple members will result in the duration to add up to more than the actual period range
var actualStart = periodEnd ; // will be "pulled" down
var actualEnd = periodStart ; // will be "pulled" up
2019-10-26 17:45:30 +00:00
foreach ( var sw in await GetPeriodFronters ( system , periodStart , periodEnd ) )
2019-08-14 05:16:48 +00:00
{
var span = sw . TimespanEnd - sw . TimespanStart ;
foreach ( var member in sw . Members )
{
if ( ! dict . ContainsKey ( member ) ) dict . Add ( member , span ) ;
else dict [ member ] + = span ;
}
if ( sw . Members . Count = = 0 ) noFronterDuration + = span ;
if ( sw . TimespanStart < actualStart ) actualStart = sw . TimespanStart ;
if ( sw . TimespanEnd > actualEnd ) actualEnd = sw . TimespanEnd ;
}
2019-10-26 17:45:30 +00:00
return new FrontBreakdown
2019-08-14 05:16:48 +00:00
{
MemberSwitchDurations = dict ,
NoFronterDuration = noFronterDuration ,
RangeStart = actualStart ,
RangeEnd = actualEnd
} ;
}
}
2019-04-19 18:48:37 +00:00
}