feat: handle event propagation via DB (HA)
This commit is contained in:
		| @@ -28,6 +28,7 @@ defaults: | ||||
|       maxFileSize: 5242880 | ||||
|       maxFiles: 10 | ||||
|     offline: false | ||||
|     ha: false | ||||
|     # DB defaults | ||||
|     api: | ||||
|       isEnabled: false | ||||
|   | ||||
| @@ -17,6 +17,7 @@ const migrateFromBeta = require('../db/beta') | ||||
| module.exports = { | ||||
|   Objection, | ||||
|   knex: null, | ||||
|   listener: null, | ||||
|   /** | ||||
|    * Initialize DB | ||||
|    * | ||||
| @@ -182,5 +183,55 @@ module.exports = { | ||||
|       ...this, | ||||
|       ...models | ||||
|     } | ||||
|   }, | ||||
|   /** | ||||
|    * Subscribe to database LISTEN / NOTIFY for multi-instances events | ||||
|    */ | ||||
|   async subscribeToNotifications () { | ||||
|     const useHA = (WIKI.config.ha === true || WIKI.config.ha === 'true' || WIKI.config.ha === 1 || WIKI.config.ha === '1') | ||||
|     if (!useHA) { | ||||
|       return | ||||
|     } else if (WIKI.config.db.type !== 'postgres') { | ||||
|       WIKI.logger.warn(`Database engine doesn't support pub/sub. Will not handle concurrent instances: [ DISABLED ]`) | ||||
|       return | ||||
|     } | ||||
|  | ||||
|     const PGPubSub = require('pg-pubsub') | ||||
|  | ||||
|     this.listener = new PGPubSub(this.knex.client.connectionSettings, { | ||||
|       log (ev) { | ||||
|         WIKI.logger.debug(ev) | ||||
|       } | ||||
|     }) | ||||
|     this.listener.addChannel('wiki', payload => { | ||||
|       if (_.has(payload.event) && payload.source !== WIKI.INSTANCE_ID) { | ||||
|         WIKI.events.emit(payload.event, payload.value) | ||||
|       } | ||||
|     }) | ||||
|     WIKI.events.onAny(this.notifyViaDB) | ||||
|  | ||||
|     WIKI.logger.info(`High-Availability Listener initialized successfully: [ OK ]`) | ||||
|   }, | ||||
|   /** | ||||
|    * Unsubscribe from database LISTEN / NOTIFY | ||||
|    */ | ||||
|   async unsubscribeToNotifications () { | ||||
|     if (this.listener) { | ||||
|       WIKI.events.offAny(this.notifyViaDB) | ||||
|       this.listener.close() | ||||
|     } | ||||
|   }, | ||||
|   /** | ||||
|    * Publish event via database NOTIFY | ||||
|    * | ||||
|    * @param {string} event Event fired | ||||
|    * @param {object} value Payload of the event | ||||
|    */ | ||||
|   notifyViaDB (event, value) { | ||||
|     this.listener.publish('wiki', { | ||||
|       source: WIKI.INSTANCE_ID, | ||||
|       event, | ||||
|       value | ||||
|     }) | ||||
|   } | ||||
| } | ||||
|   | ||||
| @@ -1,5 +1,5 @@ | ||||
| const _ = require('lodash') | ||||
| const EventEmitter = require('events') | ||||
| const EventEmitter = require('eventemitter2').EventEmitter2 | ||||
|  | ||||
| /* global WIKI */ | ||||
|  | ||||
| @@ -37,6 +37,7 @@ module.exports = { | ||||
|       WIKI.servers = require('./servers') | ||||
|       WIKI.sideloader = require('./sideloader').init() | ||||
|       WIKI.events = new EventEmitter() | ||||
|       await WIKI.models.subscribeToNotifications() | ||||
|     } catch (err) { | ||||
|       WIKI.logger.error(err) | ||||
|       process.exit(1) | ||||
| @@ -91,5 +92,21 @@ module.exports = { | ||||
|       WIKI.logger.warn(err) | ||||
|       WIKI.telemetry.sendError(err) | ||||
|     }) | ||||
|   }, | ||||
|   /** | ||||
|    * Graceful shutdown | ||||
|    */ | ||||
|   async shutdown () { | ||||
|     if (WIKI.models) { | ||||
|       await WIKI.models.unsubscribeToNotifications() | ||||
|       await WIKI.models.knex.client.pool.destroy() | ||||
|       await WIKI.models.knex.destroy() | ||||
|     } | ||||
|     if (WIKI.scheduler) { | ||||
|       WIKI.scheduler.stop() | ||||
|     } | ||||
|     if (WIKI.servers) { | ||||
|       await WIKI.servers.stopServers() | ||||
|     } | ||||
|   } | ||||
| } | ||||
|   | ||||
| @@ -4,11 +4,13 @@ | ||||
| // =========================================== | ||||
|  | ||||
| const path = require('path') | ||||
| const nanoid = require('nanoid') | ||||
|  | ||||
| let WIKI = { | ||||
|   IS_DEBUG: process.env.NODE_ENV === 'development', | ||||
|   IS_MASTER: true, | ||||
|   ROOTPATH: process.cwd(), | ||||
|   INSTANCE_ID: nanoid(10), | ||||
|   SERVERPATH: path.join(process.cwd(), 'server'), | ||||
|   Error: require('./helpers/error'), | ||||
|   configSvc: require('./core/config'), | ||||
|   | ||||
		Reference in New Issue
	
	Block a user