Major database refactor (again)
This commit is contained in:
		| @@ -2,7 +2,6 @@ | ||||
| using System.Collections.Generic; | ||||
| using System.Data; | ||||
| using System.IO; | ||||
| using System.Linq; | ||||
| using System.Threading.Tasks; | ||||
|  | ||||
| using App.Metrics; | ||||
| @@ -207,11 +206,19 @@ namespace PluralKit.Core | ||||
|             await using var conn = await db.Obtain(); | ||||
|             await func(conn); | ||||
|         } | ||||
|          | ||||
|  | ||||
|         public static async Task<T> Execute<T>(this IDatabase db, Func<IPKConnection, Task<T>> func) | ||||
|         { | ||||
|             await using var conn = await db.Obtain(); | ||||
|             return await func(conn); | ||||
|         } | ||||
|          | ||||
|         public static async IAsyncEnumerable<T> Execute<T>(this IDatabase db, Func<IPKConnection, IAsyncEnumerable<T>> func) | ||||
|         { | ||||
|             await using var conn = await db.Obtain(); | ||||
|              | ||||
|             await foreach (var val in func(conn)) | ||||
|                 yield return val; | ||||
|         } | ||||
|     } | ||||
| } | ||||
| @@ -6,16 +6,16 @@ using Dapper; | ||||
| 
 | ||||
| namespace PluralKit.Core | ||||
| { | ||||
|     public static class DatabaseFunctionsExt | ||||
|     public partial class ModelRepository | ||||
|     { | ||||
|         public static Task<MessageContext> QueryMessageContext(this IPKConnection conn, ulong account, ulong guild, ulong channel) | ||||
|         public Task<MessageContext> GetMessageContext(IPKConnection conn, ulong account, ulong guild, ulong channel) | ||||
|         { | ||||
|             return conn.QueryFirstAsync<MessageContext>("message_context",  | ||||
|                 new { account_id = account, guild_id = guild, channel_id = channel },  | ||||
|                 commandType: CommandType.StoredProcedure); | ||||
|         }   | ||||
|          | ||||
|         public static Task<IEnumerable<ProxyMember>> QueryProxyMembers(this IPKConnection conn, ulong account, ulong guild) | ||||
|         public Task<IEnumerable<ProxyMember>> GetProxyMembers(IPKConnection conn, ulong account, ulong guild) | ||||
|         { | ||||
|             return conn.QueryAsync<ProxyMember>("proxy_members",  | ||||
|                 new { account_id = account, guild_id = guild },  | ||||
							
								
								
									
										83
									
								
								PluralKit.Core/Database/Repository/ModelRepository.Group.cs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										83
									
								
								PluralKit.Core/Database/Repository/ModelRepository.Group.cs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,83 @@ | ||||
| #nullable enable | ||||
| using System.Collections.Generic; | ||||
| using System.Linq; | ||||
| using System.Text; | ||||
| using System.Threading.Tasks; | ||||
|  | ||||
| using Dapper; | ||||
|  | ||||
| namespace PluralKit.Core | ||||
| { | ||||
|     public partial class ModelRepository | ||||
|     { | ||||
|         public Task<PKGroup?> GetGroupByName(IPKConnection conn, SystemId system, string name) => | ||||
|             conn.QueryFirstOrDefaultAsync<PKGroup?>("select * from groups where system = @System and lower(Name) = lower(@Name)", new {System = system, Name = name}); | ||||
|          | ||||
|         public Task<PKGroup?> GetGroupByHid(IPKConnection conn, string hid) => | ||||
|             conn.QueryFirstOrDefaultAsync<PKGroup?>("select * from groups where hid = @hid", new {hid = hid.ToLowerInvariant()}); | ||||
|          | ||||
|         public Task<int> GetGroupMemberCount(IPKConnection conn, GroupId id, PrivacyLevel? privacyFilter = null) | ||||
|         { | ||||
|             var query = new StringBuilder("select count(*) from group_members"); | ||||
|             if (privacyFilter != null) | ||||
|                 query.Append(" inner join members on group_members.member_id = members.id"); | ||||
|             query.Append(" where group_members.group_id = @Id"); | ||||
|             if (privacyFilter != null) | ||||
|                 query.Append(" and members.member_visibility = @PrivacyFilter"); | ||||
|             return conn.QuerySingleOrDefaultAsync<int>(query.ToString(), new {Id = id, PrivacyFilter = privacyFilter}); | ||||
|         } | ||||
|          | ||||
|         public IAsyncEnumerable<PKGroup> GetMemberGroups(IPKConnection conn, MemberId id) => | ||||
|             conn.QueryStreamAsync<PKGroup>( | ||||
|                 "select groups.* from group_members inner join groups on group_members.group_id = groups.id where group_members.member_id = @Id", | ||||
|                 new {Id = id}); | ||||
|          | ||||
|         public async Task<PKGroup> CreateGroup(IPKConnection conn, SystemId system, string name) | ||||
|         { | ||||
|             var group = await conn.QueryFirstAsync<PKGroup>( | ||||
|                 "insert into groups (hid, system, name) values (find_free_group_hid(), @System, @Name) returning *", | ||||
|                 new {System = system, Name = name}); | ||||
|             _logger.Information("Created group {GroupId} in system {SystemId}: {GroupName}", group.Id, system, name); | ||||
|             return group; | ||||
|         } | ||||
|  | ||||
|         public Task<PKGroup> UpdateGroup(IPKConnection conn, GroupId id, GroupPatch patch) | ||||
|         { | ||||
|             _logger.Information("Updated {GroupId}: {@GroupPatch}", id, patch); | ||||
|             var (query, pms) = patch.Apply(UpdateQueryBuilder.Update("groups", "id = @id")) | ||||
|                 .WithConstant("id", id) | ||||
|                 .Build("returning *"); | ||||
|             return conn.QueryFirstAsync<PKGroup>(query, pms); | ||||
|         } | ||||
|  | ||||
|         public Task DeleteGroup(IPKConnection conn, GroupId group) | ||||
|         { | ||||
|             _logger.Information("Deleted {GroupId}", group); | ||||
|             return conn.ExecuteAsync("delete from groups where id = @Id", new {Id = @group}); | ||||
|         } | ||||
|  | ||||
|         public async Task AddMembersToGroup(IPKConnection conn, GroupId group, | ||||
|                                             IReadOnlyCollection<MemberId> members) | ||||
|         { | ||||
|             await using var w = | ||||
|                 conn.BeginBinaryImport("copy group_members (group_id, member_id) from stdin (format binary)"); | ||||
|             foreach (var member in members) | ||||
|             { | ||||
|                 await w.StartRowAsync(); | ||||
|                 await w.WriteAsync(group.Value); | ||||
|                 await w.WriteAsync(member.Value); | ||||
|             } | ||||
|  | ||||
|             await w.CompleteAsync(); | ||||
|             _logger.Information("Added members to {GroupId}: {MemberIds}", group, members); | ||||
|         } | ||||
|  | ||||
|         public Task RemoveMembersFromGroup(IPKConnection conn, GroupId group, | ||||
|                                            IReadOnlyCollection<MemberId> members) | ||||
|         { | ||||
|             _logger.Information("Removed members from {GroupId}: {MemberIds}", group, members); | ||||
|             return conn.ExecuteAsync("delete from group_members where group_id = @Group and member_id = any(@Members)", | ||||
|                 new {Group = @group, Members = members.ToArray()}); | ||||
|         } | ||||
|     } | ||||
| } | ||||
							
								
								
									
										53
									
								
								PluralKit.Core/Database/Repository/ModelRepository.Guild.cs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										53
									
								
								PluralKit.Core/Database/Repository/ModelRepository.Guild.cs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,53 @@ | ||||
| using System.Threading.Tasks; | ||||
|  | ||||
| using Dapper; | ||||
|  | ||||
| namespace PluralKit.Core | ||||
| { | ||||
|     public partial class ModelRepository | ||||
|     { | ||||
|         public Task UpsertGuild(IPKConnection conn, ulong guild, GuildPatch patch) | ||||
|         { | ||||
|             _logger.Information("Updated guild {GuildId}: {@GuildPatch}", guild, patch); | ||||
|             var (query, pms) = patch.Apply(UpdateQueryBuilder.Upsert("servers", "id")) | ||||
|                 .WithConstant("id", guild) | ||||
|                 .Build(); | ||||
|             return conn.ExecuteAsync(query, pms); | ||||
|         } | ||||
|          | ||||
|         public Task UpsertSystemGuild(IPKConnection conn, SystemId system, ulong guild, | ||||
|                                       SystemGuildPatch patch) | ||||
|         { | ||||
|             _logger.Information("Updated {SystemId} in guild {GuildId}: {@SystemGuildPatch}", system, guild, patch); | ||||
|             var (query, pms) = patch.Apply(UpdateQueryBuilder.Upsert("system_guild", "system, guild")) | ||||
|                 .WithConstant("system", system) | ||||
|                 .WithConstant("guild", guild) | ||||
|                 .Build(); | ||||
|             return conn.ExecuteAsync(query, pms); | ||||
|         } | ||||
|  | ||||
|         public Task UpsertMemberGuild(IPKConnection conn, MemberId member, ulong guild, | ||||
|                                       MemberGuildPatch patch) | ||||
|         { | ||||
|             _logger.Information("Updated {MemberId} in guild {GuildId}: {@MemberGuildPatch}", member, guild, patch); | ||||
|             var (query, pms) = patch.Apply(UpdateQueryBuilder.Upsert("member_guild", "member, guild")) | ||||
|                 .WithConstant("member", member) | ||||
|                 .WithConstant("guild", guild) | ||||
|                 .Build(); | ||||
|             return conn.ExecuteAsync(query, pms); | ||||
|         } | ||||
|          | ||||
|         public Task<GuildConfig> GetGuild(IPKConnection conn, ulong guild) => | ||||
|             conn.QueryFirstAsync<GuildConfig>("insert into servers (id) values (@guild) on conflict (id) do update set id = @guild returning *", new {guild}); | ||||
|  | ||||
|         public Task<SystemGuildSettings> GetSystemGuild(IPKConnection conn, ulong guild, SystemId system) => | ||||
|             conn.QueryFirstAsync<SystemGuildSettings>( | ||||
|                 "insert into system_guild (guild, system) values (@guild, @system) on conflict (guild, system) do update set guild = @guild, system = @system returning *",  | ||||
|                 new {guild, system}); | ||||
|  | ||||
|         public Task<MemberGuildSettings> GetMemberGuild(IPKConnection conn, ulong guild, MemberId member) => | ||||
|             conn.QueryFirstAsync<MemberGuildSettings>( | ||||
|                 "insert into member_guild (guild, member) values (@guild, @member) on conflict (guild, member) do update set guild = @guild, member = @member returning *", | ||||
|                 new {guild, member}); | ||||
|     } | ||||
| } | ||||
							
								
								
									
										47
									
								
								PluralKit.Core/Database/Repository/ModelRepository.Member.cs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										47
									
								
								PluralKit.Core/Database/Repository/ModelRepository.Member.cs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,47 @@ | ||||
| #nullable enable | ||||
| using System.Threading.Tasks; | ||||
|  | ||||
| using Dapper; | ||||
|  | ||||
| namespace PluralKit.Core | ||||
| { | ||||
|     public partial class ModelRepository | ||||
|     { | ||||
|         public Task<PKMember?> GetMember(IPKConnection conn, MemberId id) => | ||||
|             conn.QueryFirstOrDefaultAsync<PKMember?>("select * from members where id = @id", new {id}); | ||||
|          | ||||
|         public Task<PKMember?> GetMemberByHid(IPKConnection conn, string hid) =>  | ||||
|             conn.QuerySingleOrDefaultAsync<PKMember?>("select * from members where hid = @Hid", new { Hid = hid.ToLower() }); | ||||
|  | ||||
|         public Task<PKMember?> GetMemberByName(IPKConnection conn, SystemId system, string name) =>  | ||||
|             conn.QueryFirstOrDefaultAsync<PKMember?>("select * from members where lower(name) = lower(@Name) and system = @SystemID", new { Name = name, SystemID = system }); | ||||
|  | ||||
|         public Task<PKMember?> GetMemberByDisplayName(IPKConnection conn, SystemId system, string name) =>  | ||||
|             conn.QueryFirstOrDefaultAsync<PKMember?>("select * from members where lower(display_name) = lower(@Name) and system = @SystemID", new { Name = name, SystemID = system }); | ||||
|  | ||||
|         public async Task<PKMember> CreateMember(IPKConnection conn, SystemId id, string memberName) | ||||
|         { | ||||
|             var member = await conn.QueryFirstAsync<PKMember>( | ||||
|                 "insert into members (hid, system, name) values (find_free_member_hid(), @SystemId, @Name) returning *", | ||||
|                 new {SystemId = id, Name = memberName}); | ||||
|             _logger.Information("Created {MemberId} in {SystemId}: {MemberName}", | ||||
|                 member.Id, id, memberName); | ||||
|             return member; | ||||
|         } | ||||
|  | ||||
|         public Task<PKMember> UpdateMember(IPKConnection conn, MemberId id, MemberPatch patch) | ||||
|         { | ||||
|             _logger.Information("Updated {MemberId}: {@MemberPatch}", id, patch); | ||||
|             var (query, pms) = patch.Apply(UpdateQueryBuilder.Update("members", "id = @id")) | ||||
|                 .WithConstant("id", id) | ||||
|                 .Build("returning *"); | ||||
|             return conn.QueryFirstAsync<PKMember>(query, pms); | ||||
|         } | ||||
|  | ||||
|         public Task DeleteMember(IPKConnection conn, MemberId id) | ||||
|         { | ||||
|             _logger.Information("Deleted {MemberId}", id); | ||||
|             return conn.ExecuteAsync("delete from members where id = @Id", new {Id = id}); | ||||
|         } | ||||
|     } | ||||
| } | ||||
| @@ -0,0 +1,63 @@ | ||||
| using System.Collections.Generic; | ||||
| using System.Linq; | ||||
| using System.Threading.Tasks; | ||||
|  | ||||
| using Dapper; | ||||
|  | ||||
| namespace PluralKit.Core | ||||
| { | ||||
|     public partial class ModelRepository | ||||
|     { | ||||
|         public async Task AddMessage(IPKConnection conn, PKMessage msg) { | ||||
|             // "on conflict do nothing" in the (pretty rare) case of duplicate events coming in from Discord, which would lead to a DB error before | ||||
|             await conn.ExecuteAsync("insert into messages(mid, guild, channel, member, sender, original_mid) values(@Mid, @Guild, @Channel, @Member, @Sender, @OriginalMid) on conflict do nothing", msg); | ||||
|             _logger.Debug("Stored message {@StoredMessage} in channel {Channel}", msg, msg.Channel); | ||||
|         } | ||||
|          | ||||
|         public async Task<FullMessage> GetMessage(IPKConnection conn, ulong id) | ||||
|         { | ||||
|             FullMessage Mapper(PKMessage msg, PKMember member, PKSystem system) => | ||||
|                 new FullMessage {Message = msg, System = system, Member = member}; | ||||
|  | ||||
|             var result = await conn.QueryAsync<PKMessage, PKMember, PKSystem, FullMessage>( | ||||
|                 "select messages.*, members.*, systems.* from messages, members, systems where (mid = @Id or original_mid = @Id) and messages.member = members.id and systems.id = members.system", | ||||
|                 Mapper, new {Id = id}); | ||||
|             return result.FirstOrDefault(); | ||||
|         } | ||||
|  | ||||
|         public async Task DeleteMessage(IPKConnection conn, ulong id) | ||||
|         { | ||||
|             var rowCount = await conn.ExecuteAsync("delete from messages where mid = @Id", new {Id = id}); | ||||
|             if (rowCount > 0) | ||||
|                 _logger.Information("Deleted message {MessageId} from database", id); | ||||
|         } | ||||
|  | ||||
|         public async Task DeleteMessagesBulk(IPKConnection conn, IReadOnlyCollection<ulong> ids) | ||||
|         { | ||||
|             // Npgsql doesn't support ulongs in general - we hacked around it for plain ulongs but tbh not worth it for collections of ulong | ||||
|             // Hence we map them to single longs, which *are* supported (this is ok since they're Technically (tm) stored as signed longs in the db anyway) | ||||
|             var rowCount = await conn.ExecuteAsync("delete from messages where mid = any(@Ids)", | ||||
|                 new {Ids = ids.Select(id => (long) id).ToArray()}); | ||||
|             if (rowCount > 0) | ||||
|                 _logger.Information("Bulk deleted messages ({FoundCount} found) from database: {MessageIds}", rowCount, | ||||
|                     ids); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     public class PKMessage | ||||
|     { | ||||
|         public ulong Mid { get; set; } | ||||
|         public ulong? Guild { get; set; } // null value means "no data" (ie. from before this field being added) | ||||
|         public ulong Channel { get; set; } | ||||
|         public MemberId Member { get; set; } | ||||
|         public ulong Sender { get; set; } | ||||
|         public ulong? OriginalMid { get; set; } | ||||
|     } | ||||
|      | ||||
|     public class FullMessage | ||||
|     { | ||||
|         public PKMessage Message; | ||||
|         public PKMember Member; | ||||
|         public PKSystem System; | ||||
|     } | ||||
| } | ||||
							
								
								
									
										236
									
								
								PluralKit.Core/Database/Repository/ModelRepository.Switch.cs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										236
									
								
								PluralKit.Core/Database/Repository/ModelRepository.Switch.cs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,236 @@ | ||||
| using System.Collections.Generic; | ||||
| using System.Linq; | ||||
| using System.Threading.Tasks; | ||||
|  | ||||
| using Dapper; | ||||
|  | ||||
| using NodaTime; | ||||
|  | ||||
| using NpgsqlTypes; | ||||
|  | ||||
| namespace PluralKit.Core | ||||
| { | ||||
|     public partial class ModelRepository | ||||
|     { | ||||
|         public async Task AddSwitch(IPKConnection conn, SystemId system, IReadOnlyCollection<MemberId> members) | ||||
|         { | ||||
|             // Use a transaction here since we're doing multiple executed commands in one | ||||
|             await using var tx = await conn.BeginTransactionAsync(); | ||||
|  | ||||
|             // First, we insert the switch itself | ||||
|             var sw = await conn.QuerySingleAsync<PKSwitch>("insert into switches(system) values (@System) returning *", | ||||
|                 new {System = system}); | ||||
|  | ||||
|             // Then we insert each member in the switch in the switch_members table | ||||
|             await using (var w = conn.BeginBinaryImport("copy switch_members (switch, member) from stdin (format binary)")) | ||||
|             { | ||||
|                 foreach (var member in members) | ||||
|                 { | ||||
|                     await w.StartRowAsync(); | ||||
|                     await w.WriteAsync(sw.Id.Value, NpgsqlDbType.Integer); | ||||
|                     await w.WriteAsync(member.Value, NpgsqlDbType.Integer); | ||||
|                 } | ||||
|  | ||||
|                 await w.CompleteAsync(); | ||||
|             } | ||||
|  | ||||
|             // Finally we commit the tx, since the using block will otherwise rollback it | ||||
|             await tx.CommitAsync(); | ||||
|  | ||||
|             _logger.Information("Created {SwitchId} in {SystemId}: {Members}", sw.Id, system, members); | ||||
|         } | ||||
|  | ||||
|         public async Task MoveSwitch(IPKConnection conn, SwitchId id, Instant time) | ||||
|         { | ||||
|             await conn.ExecuteAsync("update switches set timestamp = @Time where id = @Id", | ||||
|                 new {Time = time, Id = id}); | ||||
|  | ||||
|             _logger.Information("Updated {SwitchId} timestamp: {SwitchTimestamp}", id, time); | ||||
|         } | ||||
|  | ||||
|         public async Task DeleteSwitch(IPKConnection conn, SwitchId id) | ||||
|         { | ||||
|             await conn.ExecuteAsync("delete from switches where id = @Id", new {Id = id}); | ||||
|             _logger.Information("Deleted {Switch}", id); | ||||
|         } | ||||
|  | ||||
|         public async Task DeleteAllSwitches(IPKConnection conn, SystemId system) | ||||
|         { | ||||
|             await conn.ExecuteAsync("delete from switches where system = @Id", new {Id = system}); | ||||
|             _logger.Information("Deleted all switches in {SystemId}", system); | ||||
|         } | ||||
|  | ||||
|         public IAsyncEnumerable<PKSwitch> GetSwitches(IPKConnection conn, SystemId system) | ||||
|         { | ||||
|             // TODO: refactor the PKSwitch data structure to somehow include a hydrated member list | ||||
|             return conn.QueryStreamAsync<PKSwitch>( | ||||
|                 "select * from switches where system = @System order by timestamp desc", | ||||
|                 new {System = system}); | ||||
|         } | ||||
|  | ||||
|         public async Task<int> GetSwitchCount(IPKConnection conn, SystemId system) | ||||
|         { | ||||
|             return await conn.QuerySingleAsync<int>("select count(*) from switches where system = @Id", new { Id = system }); | ||||
|         } | ||||
|  | ||||
|         public async IAsyncEnumerable<SwitchMembersListEntry> GetSwitchMembersList(IPKConnection conn, | ||||
|             SystemId system, Instant start, Instant end) | ||||
|         { | ||||
|             // Wrap multiple commands in a single transaction for performance | ||||
|             await using var tx = await conn.BeginTransactionAsync(); | ||||
|  | ||||
|             // Find the time of the last switch outside the range as it overlaps the range | ||||
|             // If no prior switch exists, the lower bound of the range remains the start time | ||||
|             var lastSwitch = await conn.QuerySingleOrDefaultAsync<Instant>( | ||||
|                 @"SELECT COALESCE(MAX(timestamp), @Start) | ||||
|                         FROM switches | ||||
|                         WHERE switches.system = @System | ||||
|                         AND switches.timestamp < @Start", | ||||
|                 new {System = system, Start = start}); | ||||
|  | ||||
|             // Then collect the time and members of all switches that overlap the range | ||||
|             var switchMembersEntries = conn.QueryStreamAsync<SwitchMembersListEntry>( | ||||
|                 @"SELECT switch_members.member, switches.timestamp | ||||
|                         FROM switches | ||||
|                         LEFT JOIN switch_members | ||||
|                         ON switches.id = switch_members.switch | ||||
|                         WHERE switches.system = @System | ||||
|                         AND ( | ||||
| 	                        switches.timestamp >= @Start | ||||
| 	                        OR switches.timestamp = @LastSwitch | ||||
|                         ) | ||||
|                         AND switches.timestamp < @End | ||||
|                         ORDER BY switches.timestamp DESC", | ||||
|                 new {System = system, Start = start, End = end, LastSwitch = lastSwitch}); | ||||
|  | ||||
|             // Yield each value here | ||||
|             await foreach (var entry in switchMembersEntries) | ||||
|                 yield return entry; | ||||
|  | ||||
|             // Don't really need to worry about the transaction here, we're not doing any *writes* | ||||
|         } | ||||
|  | ||||
|         public IAsyncEnumerable<PKMember> GetSwitchMembers(IPKConnection conn, SwitchId sw) | ||||
|         { | ||||
|             return conn.QueryStreamAsync<PKMember>( | ||||
|                 "select * from switch_members, members where switch_members.member = members.id and switch_members.switch = @Switch order by switch_members.id", | ||||
|                 new {Switch = sw}); | ||||
|         } | ||||
|  | ||||
|         public async Task<PKSwitch> GetLatestSwitch(IPKConnection conn, SystemId system) => | ||||
|             // TODO: should query directly for perf | ||||
|             await GetSwitches(conn, system).FirstOrDefaultAsync(); | ||||
|  | ||||
|         public async Task<IEnumerable<SwitchListEntry>> GetPeriodFronters(IPKConnection conn, | ||||
|                                                                           SystemId system, Instant periodStart, | ||||
|                                                                           Instant periodEnd) | ||||
|         { | ||||
|             // TODO: IAsyncEnumerable-ify this one | ||||
|             // TODO: this doesn't belong in the repo | ||||
|  | ||||
|             // Returns the timestamps and member IDs of switches overlapping the range, in chronological (newest first) order | ||||
|             var switchMembers = await GetSwitchMembersList(conn, system, periodStart, periodEnd).ToListAsync(); | ||||
|  | ||||
|             // query DB for all members involved in any of the switches above and collect into a dictionary for future use | ||||
|             // this makes sure the return list has the same instances of PKMember throughout, which is important for the dictionary | ||||
|             // key used in GetPerMemberSwitchDuration below | ||||
|             var membersList = await conn.QueryAsync<PKMember>( | ||||
|                 "select * from members where id = any(@Switches)", // lol postgres specific `= any()` syntax | ||||
|                 new {Switches = switchMembers.Select(m => m.Member.Value).Distinct().ToList()}); | ||||
|             var memberObjects = membersList.ToDictionary(m => m.Id); | ||||
|  | ||||
|             // Initialize entries - still need to loop to determine the TimespanEnd below | ||||
|             var entries = | ||||
|                 from item in switchMembers | ||||
|                 group item by item.Timestamp | ||||
|                 into g | ||||
|                 select new SwitchListEntry | ||||
|                 { | ||||
|                     TimespanStart = g.Key, | ||||
|                     Members = g.Where(x => x.Member != default(MemberId)).Select(x => memberObjects[x.Member]) | ||||
|                         .ToList() | ||||
|                 }; | ||||
|  | ||||
|             // Loop through every switch that overlaps the range and add it to the output list | ||||
|             // end time is the *FOLLOWING* switch's timestamp - we cheat by working backwards from the range end, so no dates need to be compared | ||||
|             var endTime = periodEnd; | ||||
|             var outList = new List<SwitchListEntry>(); | ||||
|             foreach (var e in entries) | ||||
|             { | ||||
|                 // Override the start time of the switch if it's outside the range (only true for the "out of range" switch we included above) | ||||
|                 var switchStartClamped = e.TimespanStart < periodStart | ||||
|                     ? periodStart | ||||
|                     : e.TimespanStart; | ||||
|  | ||||
|                 outList.Add(new SwitchListEntry | ||||
|                 { | ||||
|                     Members = e.Members, TimespanStart = switchStartClamped, TimespanEnd = endTime | ||||
|                 }); | ||||
|  | ||||
|                 // next switch's end is this switch's start (we're working backward in time) | ||||
|                 endTime = e.TimespanStart; | ||||
|             } | ||||
|  | ||||
|             return outList; | ||||
|         } | ||||
|  | ||||
|         public async Task<FrontBreakdown> GetFrontBreakdown(IPKConnection conn, SystemId system, Instant periodStart, | ||||
|                                                             Instant periodEnd) | ||||
|         { | ||||
|             // TODO: this doesn't belong in the repo | ||||
|             var dict = new Dictionary<PKMember, Duration>(); | ||||
|  | ||||
|             var noFronterDuration = Duration.Zero; | ||||
|  | ||||
|             // Sum up all switch durations for each member | ||||
|             // switches with multiple members will result in the duration to add up to more than the actual period range | ||||
|  | ||||
|             var actualStart = periodEnd; // will be "pulled" down | ||||
|             var actualEnd = periodStart; // will be "pulled" up | ||||
|  | ||||
|             foreach (var sw in await GetPeriodFronters(conn, system, periodStart, periodEnd)) | ||||
|             { | ||||
|                 var span = sw.TimespanEnd - sw.TimespanStart; | ||||
|                 foreach (var member in sw.Members) | ||||
|                 { | ||||
|                     if (!dict.ContainsKey(member)) dict.Add(member, span); | ||||
|                     else dict[member] += span; | ||||
|                 } | ||||
|  | ||||
|                 if (sw.Members.Count == 0) noFronterDuration += span; | ||||
|  | ||||
|                 if (sw.TimespanStart < actualStart) actualStart = sw.TimespanStart; | ||||
|                 if (sw.TimespanEnd > actualEnd) actualEnd = sw.TimespanEnd; | ||||
|             } | ||||
|  | ||||
|             return new FrontBreakdown | ||||
|             { | ||||
|                 MemberSwitchDurations = dict, | ||||
|                 NoFronterDuration = noFronterDuration, | ||||
|                 RangeStart = actualStart, | ||||
|                 RangeEnd = actualEnd | ||||
|             }; | ||||
|         } | ||||
|     } | ||||
|      | ||||
|     public struct SwitchListEntry | ||||
|     { | ||||
|         public ICollection<PKMember> Members; | ||||
|         public Instant TimespanStart; | ||||
|         public Instant TimespanEnd; | ||||
|     } | ||||
|  | ||||
|     public struct FrontBreakdown | ||||
|     { | ||||
|         public Dictionary<PKMember, Duration> MemberSwitchDurations; | ||||
|         public Duration NoFronterDuration; | ||||
|         public Instant RangeStart; | ||||
|         public Instant RangeEnd; | ||||
|     } | ||||
|      | ||||
|     public struct SwitchMembersListEntry | ||||
|     { | ||||
|         public MemberId Member; | ||||
|         public Instant Timestamp; | ||||
|     } | ||||
| } | ||||
							
								
								
									
										78
									
								
								PluralKit.Core/Database/Repository/ModelRepository.System.cs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										78
									
								
								PluralKit.Core/Database/Repository/ModelRepository.System.cs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,78 @@ | ||||
| #nullable enable | ||||
| using System.Collections.Generic; | ||||
| using System.Text; | ||||
| using System.Threading.Tasks; | ||||
|  | ||||
| using Dapper; | ||||
|  | ||||
| namespace PluralKit.Core | ||||
| { | ||||
|     public partial class ModelRepository | ||||
|     { | ||||
|         public Task<PKSystem?> GetSystem(IPKConnection conn, SystemId id) => | ||||
|             conn.QueryFirstOrDefaultAsync<PKSystem?>("select * from systems where id = @id", new {id}); | ||||
|  | ||||
|         public Task<PKSystem?> GetSystemByAccount(IPKConnection conn, ulong accountId) => | ||||
|             conn.QuerySingleOrDefaultAsync<PKSystem?>( | ||||
|                 "select systems.* from systems, accounts where accounts.system = systems.id and accounts.uid = @Id", | ||||
|                 new {Id = accountId}); | ||||
|  | ||||
|         public Task<PKSystem?> GetSystemByHid(IPKConnection conn, string hid) => | ||||
|             conn.QuerySingleOrDefaultAsync<PKSystem?>("select * from systems where systems.hid = @Hid", | ||||
|                 new {Hid = hid.ToLower()}); | ||||
|  | ||||
|         public Task<IEnumerable<ulong>> GetSystemAccounts(IPKConnection conn, SystemId system) => | ||||
|             conn.QueryAsync<ulong>("select uid from accounts where system = @Id", new {Id = system}); | ||||
|  | ||||
|         public IAsyncEnumerable<PKMember> GetSystemMembers(IPKConnection conn, SystemId system) => | ||||
|             conn.QueryStreamAsync<PKMember>("select * from members where system = @SystemID", new {SystemID = system}); | ||||
|  | ||||
|         public Task<int> GetSystemMemberCount(IPKConnection conn, SystemId id, PrivacyLevel? privacyFilter = null) | ||||
|         { | ||||
|             var query = new StringBuilder("select count(*) from members where system = @Id"); | ||||
|             if (privacyFilter != null) | ||||
|                 query.Append($" and member_visibility = {(int) privacyFilter.Value}"); | ||||
|             return conn.QuerySingleAsync<int>(query.ToString(), new {Id = id}); | ||||
|         } | ||||
|  | ||||
|         public async Task<PKSystem> CreateSystem(IPKConnection conn, string? systemName = null) | ||||
|         { | ||||
|             var system = await conn.QuerySingleAsync<PKSystem>( | ||||
|                 "insert into systems (hid, name) values (find_free_system_hid(), @Name) returning *", | ||||
|                 new {Name = systemName}); | ||||
|             _logger.Information("Created {SystemId}", system.Id); | ||||
|             return system; | ||||
|         } | ||||
|  | ||||
|         public Task<PKSystem> UpdateSystem(IPKConnection conn, SystemId id, SystemPatch patch) | ||||
|         { | ||||
|             _logger.Information("Updated {SystemId}: {@SystemPatch}", id, patch); | ||||
|             var (query, pms) = patch.Apply(UpdateQueryBuilder.Update("systems", "id = @id")) | ||||
|                 .WithConstant("id", id) | ||||
|                 .Build("returning *"); | ||||
|             return conn.QueryFirstAsync<PKSystem>(query, pms); | ||||
|         } | ||||
|  | ||||
|         public async Task AddAccount(IPKConnection conn, SystemId system, ulong accountId) | ||||
|         { | ||||
|             // We have "on conflict do nothing" since linking an account when it's already linked to the same system is idempotent | ||||
|             // This is used in import/export, although the pk;link command checks for this case beforehand | ||||
|             await conn.ExecuteAsync("insert into accounts (uid, system) values (@Id, @SystemId) on conflict do nothing", | ||||
|                 new {Id = accountId, SystemId = system}); | ||||
|             _logger.Information("Linked account {UserId} to {SystemId}", accountId, system); | ||||
|         } | ||||
|  | ||||
|         public async Task RemoveAccount(IPKConnection conn, SystemId system, ulong accountId) | ||||
|         { | ||||
|             await conn.ExecuteAsync("delete from accounts where uid = @Id and system = @SystemId", | ||||
|                 new {Id = accountId, SystemId = system}); | ||||
|             _logger.Information("Unlinked account {UserId} from {SystemId}", accountId, system); | ||||
|         } | ||||
|  | ||||
|         public Task DeleteSystem(IPKConnection conn, SystemId id) | ||||
|         { | ||||
|             _logger.Information("Deleted {SystemId}", id); | ||||
|             return conn.ExecuteAsync("delete from systems where id = @Id", new {Id = id}); | ||||
|         } | ||||
|     } | ||||
| } | ||||
							
								
								
									
										15
									
								
								PluralKit.Core/Database/Repository/ModelRepository.cs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										15
									
								
								PluralKit.Core/Database/Repository/ModelRepository.cs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,15 @@ | ||||
| using Serilog; | ||||
|  | ||||
| namespace PluralKit.Core | ||||
| { | ||||
|     public partial class ModelRepository | ||||
|     { | ||||
|         private readonly ILogger _logger; | ||||
|  | ||||
|         public ModelRepository(ILogger logger) | ||||
|         { | ||||
|             _logger = logger.ForContext<ILogger>() | ||||
|                 .ForContext("Elastic", "yes?"); | ||||
|         } | ||||
|     } | ||||
| } | ||||
| @@ -64,7 +64,11 @@ namespace PluralKit.Core | ||||
|         protected override async ValueTask<DbTransaction> BeginDbTransactionAsync(IsolationLevel level, CancellationToken ct) => new PKTransaction(await Inner.BeginTransactionAsync(level, ct)); | ||||
|  | ||||
|         public override void Open() => throw SyncError(nameof(Open)); | ||||
|         public override void Close() => throw SyncError(nameof(Close)); | ||||
|         public override void Close() | ||||
|         { | ||||
|             // Don't throw SyncError here, Dapper calls sync Close() internally so that sucks | ||||
|             Inner.Close(); | ||||
|         } | ||||
|  | ||||
|         IDbTransaction IPKConnection.BeginTransaction() => throw SyncError(nameof(BeginTransaction)); | ||||
|         IDbTransaction IPKConnection.BeginTransaction(IsolationLevel level) => throw SyncError(nameof(BeginTransaction)); | ||||
|   | ||||
		Reference in New Issue
	
	Block a user