feat: HA event handling + emitting
This commit is contained in:
@@ -281,6 +281,7 @@ module.exports = {
|
||||
async reloadGroups () {
|
||||
const groupsArray = await WIKI.models.groups.query()
|
||||
this.groups = _.keyBy(groupsArray, 'id')
|
||||
WIKI.auth.guest = await WIKI.models.users.getGuestUser()
|
||||
},
|
||||
|
||||
/**
|
||||
@@ -324,6 +325,7 @@ module.exports = {
|
||||
])
|
||||
|
||||
await WIKI.auth.activateStrategies()
|
||||
WIKI.events.outbound.emit('reloadAuthStrategies')
|
||||
|
||||
WIKI.logger.info('Regenerated certificates: [ COMPLETED ]')
|
||||
},
|
||||
@@ -356,5 +358,20 @@ module.exports = {
|
||||
await guestUser.$relatedQuery('groups').relate(guestGroup.id)
|
||||
|
||||
WIKI.logger.info('Guest user has been reset: [ COMPLETED ]')
|
||||
},
|
||||
|
||||
/**
|
||||
* Subscribe to HA propagation events
|
||||
*/
|
||||
subscribeToEvents() {
|
||||
WIKI.events.inbound.on('reloadGroups', () => {
|
||||
WIKI.auth.reloadGroups()
|
||||
})
|
||||
WIKI.events.inbound.on('reloadApiKeys', () => {
|
||||
WIKI.auth.reloadApiKeys()
|
||||
})
|
||||
WIKI.events.inbound.on('reloadAuthStrategies', () => {
|
||||
WIKI.auth.activateStrategies()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -95,7 +95,7 @@ module.exports = {
|
||||
* @param {Array} keys Array of keys to save
|
||||
* @returns Promise
|
||||
*/
|
||||
async saveToDb(keys) {
|
||||
async saveToDb(keys, propagate = true) {
|
||||
try {
|
||||
for (let key of keys) {
|
||||
let value = _.get(WIKI.config, key, null)
|
||||
@@ -107,6 +107,9 @@ module.exports = {
|
||||
await WIKI.models.settings.query().insert({ key, value })
|
||||
}
|
||||
}
|
||||
if (propagate) {
|
||||
WIKI.events.outbound.emit('reloadConfig')
|
||||
}
|
||||
} catch (err) {
|
||||
WIKI.logger.error(`Failed to save configuration to DB: ${err.message}`)
|
||||
return false
|
||||
@@ -119,5 +122,15 @@ module.exports = {
|
||||
*/
|
||||
async applyFlags() {
|
||||
WIKI.models.knex.client.config.debug = WIKI.config.flags.sqllog
|
||||
},
|
||||
|
||||
/**
|
||||
* Subscribe to HA propagation events
|
||||
*/
|
||||
subscribeToEvents() {
|
||||
WIKI.events.inbound.on('reloadConfig', async () => {
|
||||
await WIKI.configSvc.loadFromDb()
|
||||
await WIKI.configSvc.applyFlags()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -203,12 +203,22 @@ module.exports = {
|
||||
WIKI.logger.debug(ev)
|
||||
}
|
||||
})
|
||||
|
||||
// -> Outbound events handling
|
||||
|
||||
this.listener.addChannel('wiki', payload => {
|
||||
if (_.has(payload.event) && payload.source !== WIKI.INSTANCE_ID) {
|
||||
WIKI.events.emit(payload.event, payload.value)
|
||||
WIKI.logger.debug(`Received event ${payload.event} from instance ${payload.source}: [ OK ]`)
|
||||
WIKI.events.inbound.emit(payload.event, payload.value)
|
||||
}
|
||||
})
|
||||
WIKI.events.onAny(this.notifyViaDB)
|
||||
WIKI.events.outbound.onAny(this.notifyViaDB)
|
||||
|
||||
// -> Listen to inbound events
|
||||
|
||||
WIKI.auth.subscribeToEvents()
|
||||
WIKI.configSvc.subscribeToEvents()
|
||||
WIKI.models.pages.subscribeToEvents()
|
||||
|
||||
WIKI.logger.info(`High-Availability Listener initialized successfully: [ OK ]`)
|
||||
},
|
||||
@@ -217,7 +227,8 @@ module.exports = {
|
||||
*/
|
||||
async unsubscribeToNotifications () {
|
||||
if (this.listener) {
|
||||
WIKI.events.offAny(this.notifyViaDB)
|
||||
WIKI.events.outbound.offAny(this.notifyViaDB)
|
||||
WIKI.events.inbound.removeAllListeners()
|
||||
this.listener.close()
|
||||
}
|
||||
},
|
||||
|
||||
@@ -36,8 +36,10 @@ module.exports = {
|
||||
WIKI.scheduler = require('./scheduler').init()
|
||||
WIKI.servers = require('./servers')
|
||||
WIKI.sideloader = require('./sideloader').init()
|
||||
WIKI.events = new EventEmitter()
|
||||
await WIKI.models.subscribeToNotifications()
|
||||
WIKI.events = {
|
||||
inbound: new EventEmitter(),
|
||||
outbound: new EventEmitter()
|
||||
}
|
||||
} catch (err) {
|
||||
WIKI.logger.error(err)
|
||||
process.exit(1)
|
||||
@@ -77,6 +79,8 @@ module.exports = {
|
||||
await WIKI.models.searchEngines.initEngine()
|
||||
await WIKI.models.storage.initTargets()
|
||||
WIKI.scheduler.start()
|
||||
|
||||
await WIKI.models.subscribeToNotifications()
|
||||
},
|
||||
/**
|
||||
* Init Telemetry
|
||||
|
||||
@@ -67,8 +67,11 @@ module.exports = {
|
||||
*/
|
||||
async createApiKey (obj, args, context) {
|
||||
try {
|
||||
const key = await WIKI.models.apiKeys.createNewKey(args)
|
||||
await WIKI.auth.reloadApiKeys()
|
||||
WIKI.events.outbound.emit('reloadApiKeys')
|
||||
return {
|
||||
key: await WIKI.models.apiKeys.createNewKey(args),
|
||||
key,
|
||||
responseResult: graphHelper.generateSuccess('API Key created successfully')
|
||||
}
|
||||
} catch (err) {
|
||||
@@ -158,6 +161,7 @@ module.exports = {
|
||||
isRevoked: true
|
||||
})
|
||||
await WIKI.auth.reloadApiKeys()
|
||||
WIKI.events.outbound.emit('reloadApiKeys')
|
||||
return {
|
||||
responseResult: graphHelper.generateSuccess('API Key revoked successfully')
|
||||
}
|
||||
@@ -190,6 +194,7 @@ module.exports = {
|
||||
}).where('key', str.key)
|
||||
}
|
||||
await WIKI.auth.activateStrategies()
|
||||
WIKI.events.outbound.emit('reloadAuthStrategies')
|
||||
return {
|
||||
responseResult: graphHelper.generateSuccess('Strategies updated successfully')
|
||||
}
|
||||
|
||||
@@ -54,6 +54,7 @@ module.exports = {
|
||||
isSystem: false
|
||||
})
|
||||
await WIKI.auth.reloadGroups()
|
||||
WIKI.events.outbound.emit('reloadGroups')
|
||||
return {
|
||||
responseResult: graphHelper.generateSuccess('Group created successfully.'),
|
||||
group
|
||||
@@ -62,6 +63,7 @@ module.exports = {
|
||||
async delete(obj, args) {
|
||||
await WIKI.models.groups.query().deleteById(args.id)
|
||||
await WIKI.auth.reloadGroups()
|
||||
WIKI.events.outbound.emit('reloadGroups')
|
||||
return {
|
||||
responseResult: graphHelper.generateSuccess('Group has been deleted.')
|
||||
}
|
||||
@@ -94,6 +96,7 @@ module.exports = {
|
||||
}).where('id', args.id)
|
||||
|
||||
await WIKI.auth.reloadGroups()
|
||||
WIKI.events.outbound.emit('reloadGroups')
|
||||
|
||||
return {
|
||||
responseResult: graphHelper.generateSuccess('Group has been updated.')
|
||||
|
||||
@@ -434,6 +434,7 @@ module.exports = {
|
||||
async flushCache(obj, args, context) {
|
||||
try {
|
||||
await WIKI.models.pages.flushCache()
|
||||
WIKI.events.outbound.emit('flushCache')
|
||||
return {
|
||||
responseResult: graphHelper.generateSuccess('Pages Cache has been flushed successfully.')
|
||||
}
|
||||
|
||||
@@ -205,6 +205,7 @@ module.exports = {
|
||||
|
||||
if (args.groupMode !== `NONE`) {
|
||||
await WIKI.auth.reloadGroups()
|
||||
WIKI.events.outbound.emit('reloadGroups')
|
||||
}
|
||||
|
||||
client.close()
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
// ===========================================
|
||||
|
||||
const path = require('path')
|
||||
const nanoid = require('nanoid')
|
||||
const { nanoid } = require('nanoid')
|
||||
|
||||
let WIKI = {
|
||||
IS_DEBUG: process.env.NODE_ENV === 'development',
|
||||
|
||||
@@ -436,7 +436,8 @@ module.exports = class Page extends Model {
|
||||
localeCode: opts.destinationLocale,
|
||||
hash: destinationHash
|
||||
}).findById(page.id)
|
||||
await WIKI.models.pages.deletePageFromCache(page)
|
||||
await WIKI.models.pages.deletePageFromCache(page.hash)
|
||||
WIKI.events.outbound.emit('deletePageFromCache', page.hash)
|
||||
|
||||
// -> Rebuild page tree
|
||||
await WIKI.models.pages.rebuildTree()
|
||||
@@ -512,7 +513,8 @@ module.exports = class Page extends Model {
|
||||
|
||||
// -> Delete page
|
||||
await WIKI.models.pages.query().delete().where('id', page.id)
|
||||
await WIKI.models.pages.deletePageFromCache(page)
|
||||
await WIKI.models.pages.deletePageFromCache(page.hash)
|
||||
WIKI.events.outbound.emit('deletePageFromCache', page.hash)
|
||||
|
||||
// -> Rebuild page tree
|
||||
await WIKI.models.pages.rebuildTree()
|
||||
@@ -609,7 +611,8 @@ module.exports = class Page extends Model {
|
||||
affectedHashes = qryHashes.map(h => h.hash)
|
||||
}
|
||||
for (const hash of affectedHashes) {
|
||||
await WIKI.models.pages.deletePageFromCache({ hash })
|
||||
await WIKI.models.pages.deletePageFromCache(hash)
|
||||
WIKI.events.outbound.emit('deletePageFromCache', hash)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -853,4 +856,16 @@ module.exports = class Page extends Model {
|
||||
.replace(/\s\s+/g, ' ')
|
||||
.split(' ').filter(w => w.length > 1).join(' ').toLowerCase()
|
||||
}
|
||||
|
||||
/**
|
||||
* Subscribe to HA propagation events
|
||||
*/
|
||||
static subscribeToEvents() {
|
||||
WIKI.events.inbound.on('deletePageFromCache', hash => {
|
||||
WIKI.models.pages.deletePageFromCache(hash)
|
||||
})
|
||||
WIKI.events.inbound.on('flushCache', () => {
|
||||
WIKI.models.pages.flushCache()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -186,7 +186,7 @@ module.exports = () => {
|
||||
'telemetry',
|
||||
'theming',
|
||||
'title'
|
||||
])
|
||||
], false)
|
||||
|
||||
// Truncate tables (reset from previous failed install)
|
||||
await WIKI.models.locales.query().where('code', '!=', 'x').del()
|
||||
|
||||
Reference in New Issue
Block a user