wikijs-fork/server/core/queue.js
2018-11-18 16:38:00 -05:00

64 lines
1.7 KiB
JavaScript

const path = require('path')
const Bull = require('bull')
const Promise = require('bluebird')
const _ = require('lodash')
/* global WIKI */
module.exports = {
job: {},
init() {
_.forOwn(WIKI.data.jobs, (queueParams, queueName) => {
this.job[queueName] = new Bull(queueName, {
prefix: `queue`,
redis: WIKI.config.redis
})
if (queueParams.concurrency > 0) {
this.job[queueName].process(queueParams.concurrency, path.join(WIKI.SERVERPATH, `jobs/${_.kebabCase(queueName)}.js`))
} else {
this.job[queueName].process(path.join(WIKI.SERVERPATH, `jobs/${_.kebabCase(queueName)}.js`))
}
})
return this
},
start() {
_.forOwn(WIKI.data.jobs, (queueParams, queueName) => {
if (queueParams.onInit) {
this.job[queueName].add({}, {
removeOnComplete: true
})
}
if (queueParams.cron) {
this.job[queueName].add({}, {
repeat: { cron: queueParams.cron },
removeOnComplete: true
})
}
})
},
async quit() {
for (const queueName in this.job) {
await this.job[queueName].close()
}
},
async clean() {
return Promise.each(_.keys(WIKI.data.jobs), queueName => {
return new Promise((resolve, reject) => {
let keyStream = WIKI.redis.scanStream({
match: `queue:${queueName}:*`
})
keyStream.on('data', resultKeys => {
if (resultKeys.length > 0) {
WIKI.redis.del(resultKeys)
}
})
keyStream.on('end', resolve)
})
}).then(() => {
WIKI.logger.info('Purging old queue jobs: [ OK ]')
}).return(true).catch(err => {
WIKI.logger.error(err)
})
}
}