Upgrade various store methods to IAsyncEnumerable

This commit is contained in:
Ske
2020-01-18 00:02:17 +01:00
parent 9a3355eb4b
commit 8a689ac0f2
7 changed files with 62 additions and 38 deletions

View File

@@ -25,6 +25,7 @@
<PackageReference Include="Serilog.Sinks.Async" Version="1.4.1-dev-00071" />
<PackageReference Include="Serilog.Sinks.Console" Version="4.0.0-dev-00834" />
<PackageReference Include="Serilog.Sinks.File" Version="4.1.0" />
<PackageReference Include="System.Interactive.Async" Version="4.0.0" />
</ItemGroup>
<ItemGroup>

View File

@@ -1,4 +1,5 @@
using System.Collections.Generic;
using System.Data.Common;
using System.Linq;
using System.Threading.Tasks;
@@ -289,7 +290,7 @@ namespace PluralKit {
/// Gets switches from a system.
/// </summary>
/// <returns>An enumerable of the *count* latest switches in the system, in latest-first order. May contain fewer elements than requested.</returns>
Task<IEnumerable<PKSwitch>> GetSwitches(PKSystem system, int count);
IAsyncEnumerable<PKSwitch> GetSwitches(PKSystem system);
/// <summary>
/// Gets the latest (temporally; closest to now) switch of a given system.
@@ -299,7 +300,7 @@ namespace PluralKit {
/// <summary>
/// Gets the members a given switch consists of.
/// </summary>
Task<IEnumerable<PKMember>> GetSwitchMembers(PKSwitch sw);
IAsyncEnumerable<PKMember> GetSwitchMembers(PKSwitch sw);
/// <summary>
/// Gets a list of fronters over a given period of time.
@@ -787,7 +788,9 @@ namespace PluralKit {
public async Task AddSwitchesBulk(PKSystem system, IEnumerable<ImportedSwitch> switches)
{
// Read existing switches to enforce unique timestamps
var priorSwitches = await GetSwitches(system);
var priorSwitches = new List<PKSwitch>();
await foreach (var sw in GetSwitches(system)) priorSwitches.Add(sw);
var lastSwitchId = priorSwitches.Any()
? priorSwitches.Max(x => x.Id)
: 0;
@@ -855,12 +858,13 @@ namespace PluralKit {
_logger.Information("Completed bulk import of switches for system {0}", system.Hid);
}
public async Task<IEnumerable<PKSwitch>> GetSwitches(PKSystem system, int count = 9999999)
public IAsyncEnumerable<PKSwitch> GetSwitches(PKSystem system)
{
// TODO: refactor the PKSwitch data structure to somehow include a hydrated member list
// (maybe when we get caching in?)
using (var conn = await _conn.Obtain())
return await conn.QueryAsync<PKSwitch>("select * from switches where system = @System order by timestamp desc limit @Count", new {System = system.Id, Count = count});
return _conn.QueryStreamAsync<PKSwitch>(
"select * from switches where system = @System order by timestamp desc",
new {System = system.Id});
}
public async Task<IEnumerable<SwitchMembersListEntry>> GetSwitchMembersList(PKSystem system, Instant start, Instant end)
@@ -899,15 +903,15 @@ namespace PluralKit {
}
}
public async Task<IEnumerable<PKMember>> GetSwitchMembers(PKSwitch sw)
public IAsyncEnumerable<PKMember> GetSwitchMembers(PKSwitch sw)
{
using (var conn = await _conn.Obtain())
return await conn.QueryAsync<PKMember>(
"select * from switch_members, members where switch_members.member = members.id and switch_members.switch = @Switch order by switch_members.id",
new {Switch = sw.Id});
return _conn.QueryStreamAsync<PKMember>(
"select * from switch_members, members where switch_members.member = members.id and switch_members.switch = @Switch order by switch_members.id",
new {Switch = sw.Id});
}
public async Task<PKSwitch> GetLatestSwitch(PKSystem system) => (await GetSwitches(system, 1)).FirstOrDefault();
public async Task<PKSwitch> GetLatestSwitch(PKSystem system) =>
await GetSwitches(system).FirstOrDefaultAsync();
public async Task MoveSwitch(PKSwitch sw, Instant time)
{

View File

@@ -669,4 +669,17 @@ namespace PluralKit
EventId = Guid.NewGuid();
}
}
public static class ConnectionUtils
{
public static async IAsyncEnumerable<T> QueryStreamAsync<T>(this DbConnectionFactory connFactory, string sql, object param)
{
using var conn = await connFactory.Obtain();
var reader = await conn.ExecuteReaderAsync(sql, param);
var parser = reader.GetRowParser<T>();
while (reader.Read())
yield return parser(reader);
}
}
}