feat: scheduler storage sync

This commit is contained in:
Nick
2019-02-17 21:48:48 -05:00
parent aa27554bc7
commit 7e458f98b4
9 changed files with 159 additions and 127 deletions

View File

@@ -57,9 +57,6 @@ jobs:
syncGraphUpdates:
onInit: true
schedule: P1D
syncStorage:
onInit: true
schedule: storage.syncInterval
groups:
defaultPermissions:
- 'manage:pages'

View File

@@ -1,84 +0,0 @@
const moment = require('moment')
const childProcess = require('child_process')
module.exports = class Job {
constructor({
name,
immediate = false,
schedule = 'P1D',
repeat = false,
worker = false
}) {
this.finished = Promise.resolve()
this.name = name
this.immediate = immediate
this.schedule = moment.duration(schedule)
this.repeat = repeat
this.worker = worker
}
/**
* Start Job
*
* @param {Object} data Job Data
*/
start(data) {
if (this.immediate) {
this.invoke(data)
} else {
this.queue(data)
}
}
/**
* Queue the next job run according to the wait duration
*
* @param {Object} data Job Data
*/
queue(data) {
this.timeout = setTimeout(this.invoke.bind(this), this.schedule.asMilliseconds(), data)
}
/**
* Run the actual job
*
* @param {Object} data Job Data
*/
async invoke(data) {
try {
if (this.worker) {
const proc = childProcess.fork(`server/core/worker.js`, [
`--job=${this.name}`,
`--data=${data}`
], {
cwd: WIKI.ROOTPATH
})
this.finished = new Promise((resolve, reject) => {
proc.on('exit', (code, signal) => {
if (code === 0) {
resolve()
} else {
reject(signal)
}
proc.kill()
})
})
} else {
this.finished = require(`../jobs/${this.name}`)(data)
}
await this.finished
} catch (err) {
WIKI.logger.warn(err)
}
if (this.repeat) {
this.queue(data)
}
}
/**
* Stop any future job invocation from occuring
*/
stop() {
clearTimeout(this.timeout)
}
}

View File

@@ -1,9 +1,93 @@
const Job = require('./job')
const moment = require('moment')
const childProcess = require('child_process')
const _ = require('lodash')
const configHelper = require('../helpers/config')
/* global WIKI */
class Job {
constructor({
name,
immediate = false,
schedule = 'P1D',
repeat = false,
worker = false
}) {
this.finished = Promise.resolve()
this.name = name
this.immediate = immediate
this.schedule = moment.duration(schedule)
this.repeat = repeat
this.worker = worker
}
/**
* Start Job
*
* @param {Object} data Job Data
*/
start(data) {
if (this.immediate) {
this.invoke(data)
} else {
this.queue(data)
}
}
/**
* Queue the next job run according to the wait duration
*
* @param {Object} data Job Data
*/
queue(data) {
this.timeout = setTimeout(this.invoke.bind(this), this.schedule.asMilliseconds(), data)
}
/**
* Run the actual job
*
* @param {Object} data Job Data
*/
async invoke(data) {
try {
if (this.worker) {
const proc = childProcess.fork(`server/core/worker.js`, [
`--job=${this.name}`,
`--data=${data}`
], {
cwd: WIKI.ROOTPATH
})
this.finished = new Promise((resolve, reject) => {
proc.on('exit', (code, signal) => {
if (code === 0) {
resolve()
} else {
reject(signal)
}
proc.kill()
})
})
} else {
this.finished = require(`../jobs/${this.name}`)(data)
}
await this.finished
} catch (err) {
WIKI.logger.warn(err)
}
if (this.repeat) {
this.queue(data)
}
}
/**
* Stop any future job invocation from occuring
*/
stop() {
clearTimeout(this.timeout)
}
}
module.exports = {
jobs: [],
init() {
@@ -11,13 +95,13 @@ module.exports = {
},
start() {
_.forOwn(WIKI.data.jobs, (queueParams, queueName) => {
const schedule = (configHelper.isValidDurationString(queueParams.schedule)) ? queueParams : _.get(WIKI.config, queueParams.schedule)
// this.registerJob({
// name: _.kebabCase(queueName),
// immediate: queueParams.onInit,
// schedule: schedule,
// repeat: true
// })
const schedule = (configHelper.isValidDurationString(queueParams.schedule)) ? queueParams.schedule : _.get(WIKI.config, queueParams.schedule)
this.registerJob({
name: _.kebabCase(queueName),
immediate: queueParams.onInit,
schedule: schedule,
repeat: true
})
})
},
registerJob(opts, data) {

View File

@@ -1,18 +1,20 @@
const _ = require('lodash')
/* global WIKI */
module.exports = async ({ target }) => {
WIKI.logger.info(`Syncing with storage provider ${job.data.target.title}...`)
module.exports = async (targetKey) => {
WIKI.logger.info(`Syncing with storage target ${targetKey}...`)
// try {
// const target = require(`../modules/storage/${job.data.target.key}/storage.js`)
// target[job.data.event].call({
// config: job.data.target.config,
// mode: job.data.target.mode,
// page: job.data.page
// })
// WIKI.logger.info(`Syncing with storage provider ${job.data.target.title}: [ COMPLETED ]`)
// } catch (err) {
// WIKI.logger.error(`Syncing with storage provider ${job.data.target.title}: [ FAILED ]`)
// WIKI.logger.error(err.message)
// }
try {
const target = _.find(WIKI.models.storage.targets, ['key', targetKey])
if (target) {
await target.fn.sync()
WIKI.logger.info(`Syncing with storage target ${targetKey}: [ COMPLETED ]`)
} else {
throw new Error('Invalid storage target. Unable to perform sync.')
}
} catch (err) {
WIKI.logger.error(`Syncing with storage target ${targetKey}: [ FAILED ]`)
WIKI.logger.error(err.message)
}
}

View File

@@ -7,8 +7,6 @@ const commonHelper = require('../helpers/common')
/* global WIKI */
let targets = []
/**
* Storage model
*/
@@ -104,22 +102,46 @@ module.exports = class Storage extends Model {
}
}
/**
* Initialize active storage targets
*/
static async initTargets() {
targets = await WIKI.models.storage.query().where('isEnabled', true).orderBy('key')
this.targets = await WIKI.models.storage.query().where('isEnabled', true).orderBy('key')
try {
for(let target of targets) {
// -> Stop and delete existing jobs
const prevjobs = _.remove(WIKI.scheduler.jobs, job => job.name === `sync-storage`)
if (prevjobs.length > 0) {
prevjobs.forEach(job => job.stop())
}
// -> Initialize targets
for(let target of this.targets) {
const targetDef = _.find(WIKI.data.storage, ['key', target.key])
target.fn = require(`../modules/storage/${target.key}/storage`)
target.fn.config = target.config
target.fn.mode = target.mode
try {
await target.fn.init()
// -> Save succeeded init state
await WIKI.models.storage.query().patch({
state: {
status: 'operational',
message: ''
}
}).where('key', target.key)
// -> Set recurring sync job
if (targetDef.schedule && target.syncInterval !== `P0D`) {
WIKI.scheduler.registerJob({
name: `sync-storage`,
immediate: false,
schedule: target.syncInterval,
repeat: true
}, target.key)
}
} catch (err) {
// -> Save initialization error
await WIKI.models.storage.query().patch({
state: {
status: 'error',
@@ -127,11 +149,6 @@ module.exports = class Storage extends Model {
}
}).where('key', target.key)
}
// if (target.schedule) {
// WIKI.scheduler.registerJob({
// name:
// }, target.fn.sync)
// }
}
} catch (err) {
WIKI.logger.warn(err)
@@ -141,7 +158,7 @@ module.exports = class Storage extends Model {
static async pageEvent({ event, page }) {
try {
for(let target of targets) {
for(let target of this.targets) {
await target.fn[event](page)
}
} catch (err) {