refactor: removed redis + new scheduler engine

This commit is contained in:
Nick
2019-02-13 17:20:46 -05:00
parent e90873e13d
commit cd3f88bad0
51 changed files with 274 additions and 735 deletions

7
server/core/cache.js Normal file
View File

@@ -0,0 +1,7 @@
const NodeCache = require('node-cache')
module.exports = {
init() {
return new NodeCache()
}
}

84
server/core/job.js Normal file
View File

@@ -0,0 +1,84 @@
const moment = require('moment')
const childProcess = require('child_process')
module.exports = class Job {
constructor({
name,
immediate = false,
schedule = 'P1D',
repeat = false,
worker = false
}) {
this.finished = Promise.resolve()
this.name = name
this.immediate = immediate
this.schedule = moment.duration(schedule)
this.repeat = repeat
this.worker = worker
}
/**
* Start Job
*
* @param {Object} data Job Data
*/
start(data) {
if (this.immediate) {
this.invoke(data)
} else {
this.queue(data)
}
}
/**
* Queue the next job run according to the wait duration
*
* @param {Object} data Job Data
*/
queue(data) {
this.timeout = setTimeout(this.invoke.bind(this), this.schedule.asMilliseconds(), data)
}
/**
* Run the actual job
*
* @param {Object} data Job Data
*/
async invoke(data) {
try {
if (this.worker) {
const proc = childProcess.fork(`server/core/worker.js`, [
`--job=${this.name}`,
`--data=${data}`
], {
cwd: WIKI.ROOTPATH
})
this.finished = new Promise((resolve, reject) => {
proc.on('exit', (code, signal) => {
if (code === 0) {
resolve()
} else {
reject(signal)
}
proc.kill()
})
})
} else {
this.finished = require(`../jobs/${this.name}`)(data)
}
await this.finished
} catch (err) {
WIKI.logger.warn(err)
}
if (this.repeat) {
this.queue(data)
}
}
/**
* Stop any future job invocation from occuring
*/
stop() {
clearTimeout(this.timeout)
}
}

View File

@@ -10,10 +10,10 @@ module.exports = {
WIKI.logger.info('=======================================')
WIKI.models = require('./db').init()
WIKI.redis = require('./redis').init()
WIKI.queue = require('./queue').init()
await this.preBootMaster()
await WIKI.models.onReady
await WIKI.configSvc.loadFromDb()
this.bootMaster()
},
/**
@@ -21,11 +21,10 @@ module.exports = {
*/
async preBootMaster() {
try {
await WIKI.models.onReady
await WIKI.configSvc.loadFromDb()
await WIKI.queue.clean()
await this.initTelemetry()
WIKI.cache = require('./cache').init()
WIKI.scheduler = require('./scheduler').init()
WIKI.events = new EventEmitter()
WIKI.redisSub = require('./redis').subscribe()
} catch (err) {
WIKI.logger.error(err)
process.exit(1)
@@ -40,7 +39,7 @@ module.exports = {
WIKI.logger.info('Starting setup wizard...')
require('../setup')()
} else {
await this.initTelemetry()
await this.preBootMaster()
await require('../master')()
this.postBootMaster()
}
@@ -62,7 +61,7 @@ module.exports = {
await WIKI.auth.activateStrategies()
await WIKI.models.storage.initTargets()
await WIKI.queue.start()
WIKI.scheduler.start()
},
/**
* Init Telemetry

View File

@@ -27,15 +27,6 @@ module.exports = {
// Load current language + namespaces
this.refreshNamespaces(true)
// Listen for localization events
WIKI.events.on('localization', (action) => {
switch (action) {
case 'reload':
this.refreshNamespaces()
break
}
})
return this
},
/**

View File

@@ -1,63 +0,0 @@
const path = require('path')
const Bull = require('bull')
const Promise = require('bluebird')
const _ = require('lodash')
/* global WIKI */
module.exports = {
job: {},
init() {
_.forOwn(WIKI.data.jobs, (queueParams, queueName) => {
this.job[queueName] = new Bull(queueName, {
prefix: `queue`,
redis: WIKI.config.redis
})
if (queueParams.concurrency > 0) {
this.job[queueName].process(queueParams.concurrency, path.join(WIKI.SERVERPATH, `jobs/${_.kebabCase(queueName)}.js`))
} else {
this.job[queueName].process(path.join(WIKI.SERVERPATH, `jobs/${_.kebabCase(queueName)}.js`))
}
})
return this
},
start() {
_.forOwn(WIKI.data.jobs, (queueParams, queueName) => {
if (queueParams.onInit) {
this.job[queueName].add({}, {
removeOnComplete: true
})
}
if (queueParams.cron) {
this.job[queueName].add({}, {
repeat: { cron: queueParams.cron },
removeOnComplete: true
})
}
})
},
async quit() {
for (const queueName in this.job) {
await this.job[queueName].close()
}
},
async clean() {
return Promise.each(_.keys(WIKI.data.jobs), queueName => {
return new Promise((resolve, reject) => {
let keyStream = WIKI.redis.scanStream({
match: `queue:${queueName}:*`
})
keyStream.on('data', resultKeys => {
if (resultKeys.length > 0) {
WIKI.redis.del(resultKeys)
}
})
keyStream.on('end', resolve)
})
}).then(() => {
WIKI.logger.info('Purging old queue jobs: [ OK ]')
}).return(true).catch(err => {
WIKI.logger.error(err)
})
}
}

View File

@@ -1,36 +0,0 @@
const Redis = require('ioredis')
const { isPlainObject } = require('lodash')
/* global WIKI */
module.exports = {
init() {
if (isPlainObject(WIKI.config.redis)) {
let red = new Redis(WIKI.config.redis)
red.on('ready', () => {
WIKI.logger.info('Redis connection: [ OK ]')
})
red.on('error', () => {
WIKI.logger.error('Failed to connect to Redis instance!')
process.exit(1)
})
return red
} else {
WIKI.logger.error('Invalid Redis configuration!')
process.exit(1)
}
},
subscribe() {
let red = this.init()
red.on('message', (channel, msg) => {
WIKI.events.emit(channel, msg)
})
red.subscribe('localization', 'updates', (err, count) => {
if (err) {
WIKI.logger.error(err)
process.exit(1)
}
})
return red
}
}

34
server/core/scheduler.js Normal file
View File

@@ -0,0 +1,34 @@
const Job = require('./job')
const _ = require('lodash')
/* global WIKI */
module.exports = {
jobs: [],
init() {
return this
},
start() {
_.forOwn(WIKI.data.jobs, (queueParams, queueName) => {
this.registerJob({
name: _.kebabCase(queueName),
immediate: queueParams.onInit,
schedule: queueParams.schedule,
repeat: true
})
})
},
registerJob(opts, data) {
const job = new Job(opts)
job.start(data)
if (job.repeat) {
this.jobs.push(job)
}
return job
},
stop() {
this.jobs.forEach(job => {
job.stop()
})
}
}

View File

@@ -15,15 +15,6 @@ module.exports = {
minimumNodeRequired: '10.12.0'
},
init() {
// Listen for updates events
WIKI.events.on('updates', (infoRaw) => {
try {
this.updates = JSON.parse(infoRaw)
} catch (err) {
WIKI.logger.warn('Failed to parse updates info.')
}
})
// Clear content cache
fs.emptyDir(path.join(WIKI.ROOTPATH, 'data/cache'))

View File

@@ -10,9 +10,10 @@ let WIKI = {
global.WIKI = WIKI
WIKI.configSvc.init()
// ----------------------------------------
// Init Logger
// ----------------------------------------
WIKI.logger = require('./logger').init('JOB')
const args = require('yargs').argv
;(async () => {
await require(`../jobs/${args.job}`)(args.data)
process.exit(0)
})()