feat: storage schedule + status

This commit is contained in:
Nick
2019-02-17 01:32:35 -05:00
parent 040f840807
commit aa27554bc7
19 changed files with 313 additions and 69 deletions

View File

@@ -57,6 +57,9 @@ jobs:
syncGraphUpdates:
onInit: true
schedule: P1D
syncStorage:
onInit: true
schedule: storage.syncInterval
groups:
defaultPermissions:
- 'manage:pages'

View File

@@ -1,5 +1,6 @@
const Job = require('./job')
const _ = require('lodash')
const configHelper = require('../helpers/config')
/* global WIKI */
@@ -10,12 +11,13 @@ module.exports = {
},
start() {
_.forOwn(WIKI.data.jobs, (queueParams, queueName) => {
this.registerJob({
name: _.kebabCase(queueName),
immediate: queueParams.onInit,
schedule: queueParams.schedule,
repeat: true
})
const schedule = (configHelper.isValidDurationString(queueParams.schedule)) ? queueParams : _.get(WIKI.config, queueParams.schedule)
// this.registerJob({
// name: _.kebabCase(queueName),
// immediate: queueParams.onInit,
// schedule: schedule,
// repeat: true
// })
})
},
registerJob(opts, data) {

View File

@@ -0,0 +1,15 @@
exports.up = knex => {
return knex.schema
.table('storage', table => {
table.string('syncInterval')
table.json('state')
})
}
exports.down = knex => {
return knex.schema
.table('storage', table => {
table.dropColumn('syncInterval')
table.dropColumn('state')
})
}

View File

@@ -0,0 +1,5 @@
const { createRateLimitDirective } = require('graphql-rate-limit-directive')
module.exports = createRateLimitDirective({
keyGenerator: (directiveArgs, source, args, context, info) => `${context.req.ip}:${info.parentType}.${info.fieldName}`
})

View File

@@ -6,7 +6,7 @@ const autoload = require('auto-load')
const PubSub = require('graphql-subscriptions').PubSub
const { LEVEL, MESSAGE } = require('triple-beam')
const Transport = require('winston-transport')
const { createRateLimitTypeDef, createRateLimitDirective } = require('graphql-rate-limit-directive')
const { createRateLimitTypeDef } = require('graphql-rate-limit-directive')
/* global WIKI */
@@ -35,10 +35,7 @@ resolversObj.forEach(resolver => {
// Directives
let schemaDirectives = {
...autoload(path.join(WIKI.SERVERPATH, 'graph/directives')),
rateLimit: createRateLimitDirective({
keyGenerator: (directiveArgs, source, args, context, info) => `${context.req.ip}:${info.parentType}.${info.fieldName}`
})
...autoload(path.join(WIKI.SERVERPATH, 'graph/directives'))
}
// Live Trail Logger (admin)

View File

@@ -18,6 +18,9 @@ module.exports = {
return {
...targetInfo,
...tgt,
hasSchedule: (targetInfo.schedule !== false),
syncInterval: targetInfo.syncInterval || targetInfo.schedule || 'P0D',
syncIntervalDefault: targetInfo.schedule,
config: _.sortBy(_.transform(tgt.config, (res, value, key) => {
const configData = _.get(targetInfo.props, key, {})
res.push({
@@ -33,6 +36,18 @@ module.exports = {
if (args.filter) { targets = graphHelper.filter(targets, args.filter) }
if (args.orderBy) { targets = graphHelper.orderBy(targets, args.orderBy) }
return targets
},
async status(obj, args, context, info) {
let activeTargets = await WIKI.models.storage.query().where('isEnabled', true)
return activeTargets.map(tgt => {
const targetInfo = _.find(WIKI.data.storage, ['key', tgt.key]) || {}
return {
key: tgt.key,
title: targetInfo.title,
status: _.get(tgt, 'state.status', 'pending'),
message: _.get(tgt, 'state.message', 'Initializing...')
}
})
}
},
StorageMutation: {
@@ -42,10 +57,15 @@ module.exports = {
await WIKI.models.storage.query().patch({
isEnabled: tgt.isEnabled,
mode: tgt.mode,
syncInterval: tgt.syncInterval,
config: _.reduce(tgt.config, (result, value, key) => {
_.set(result, `${value.key}`, _.get(JSON.parse(value.value), 'v', null))
return result
}, {})
}, {}),
state: {
status: 'pending',
message: 'Initializing...'
}
}).where('key', tgt.key)
}
await WIKI.models.storage.initTargets()

View File

@@ -19,6 +19,8 @@ type StorageQuery {
filter: String
orderBy: String
): [StorageTarget] @auth(requires: ["manage:system"])
status: [StorageStatus] @auth(requires: ["manage:system"])
}
# -----------------------------------------------
@@ -27,7 +29,7 @@ type StorageQuery {
type StorageMutation {
updateTargets(
targets: [StorageTargetInput]
targets: [StorageTargetInput]!
): DefaultResponse @auth(requires: ["manage:system"])
}
@@ -45,6 +47,9 @@ type StorageTarget {
website: String
supportedModes: [String]
mode: String
hasSchedule: Boolean!
syncInterval: String
syncIntervalDefault: String
config: [KeyValuePair]
}
@@ -52,5 +57,13 @@ input StorageTargetInput {
isEnabled: Boolean!
key: String!
mode: String!
syncInterval: String
config: [KeyValuePairInput]
}
type StorageStatus {
key: String!
title: String!
status: String!
message: String
}

View File

@@ -2,6 +2,8 @@
const _ = require('lodash')
const isoDurationReg = /^(-|\+)?P(?:([-+]?[0-9,.]*)Y)?(?:([-+]?[0-9,.]*)M)?(?:([-+]?[0-9,.]*)W)?(?:([-+]?[0-9,.]*)D)?(?:T(?:([-+]?[0-9,.]*)H)?(?:([-+]?[0-9,.]*)M)?(?:([-+]?[0-9,.]*)S)?)?$/
module.exports = {
/**
* Parse configuration value for environment vars
@@ -15,5 +17,9 @@ module.exports = {
/\$\(([A-Z0-9_]+)\)/g,
(fm, m) => { return process.env[m] }
)
},
isValidDurationString (val) {
return isoDurationReg.test(val)
}
}

View File

@@ -1,20 +1,18 @@
require('../core/worker')
/* global WIKI */
module.exports = async (job) => {
module.exports = async ({ target }) => {
WIKI.logger.info(`Syncing with storage provider ${job.data.target.title}...`)
try {
const target = require(`../modules/storage/${job.data.target.key}/storage.js`)
target[job.data.event].call({
config: job.data.target.config,
mode: job.data.target.mode,
page: job.data.page
})
WIKI.logger.info(`Syncing with storage provider ${job.data.target.title}: [ COMPLETED ]`)
} catch (err) {
WIKI.logger.error(`Syncing with storage provider ${job.data.target.title}: [ FAILED ]`)
WIKI.logger.error(err.message)
}
// try {
// const target = require(`../modules/storage/${job.data.target.key}/storage.js`)
// target[job.data.event].call({
// config: job.data.target.config,
// mode: job.data.target.mode,
// page: job.data.page
// })
// WIKI.logger.info(`Syncing with storage provider ${job.data.target.title}: [ COMPLETED ]`)
// } catch (err) {
// WIKI.logger.error(`Syncing with storage provider ${job.data.target.title}: [ FAILED ]`)
// WIKI.logger.error(err.message)
// }
}

View File

@@ -63,10 +63,15 @@ module.exports = class Storage extends Model {
key: target.key,
isEnabled: false,
mode: target.defaultMode || 'push',
syncInterval: target.schedule || 'P0D',
config: _.transform(target.props, (result, value, key) => {
_.set(result, key, value.default)
return result
}, {})
}, {}),
state: {
status: 'pending',
message: ''
}
})
} else {
const targetConfig = _.get(_.find(dbTargets, ['key', target.key]), 'config', {})
@@ -100,13 +105,28 @@ module.exports = class Storage extends Model {
}
static async initTargets() {
targets = await WIKI.models.storage.query().where('isEnabled', true)
targets = await WIKI.models.storage.query().where('isEnabled', true).orderBy('key')
try {
for(let target of targets) {
target.fn = require(`../modules/storage/${target.key}/storage`)
target.fn.config = target.config
target.fn.mode = target.mode
await target.fn.init()
try {
await target.fn.init()
await WIKI.models.storage.query().patch({
state: {
status: 'operational',
message: ''
}
}).where('key', target.key)
} catch (err) {
await WIKI.models.storage.query().patch({
state: {
status: 'error',
message: err.message
}
}).where('key', target.key)
}
// if (target.schedule) {
// WIKI.scheduler.registerJob({
// name: