feat: elasticsearch engine

This commit is contained in:
Nick 2019-03-23 18:18:36 -04:00
parent 342747c860
commit 81ff24b9b6
2 changed files with 299 additions and 20 deletions

View File

@ -4,7 +4,7 @@ description: Elasticsearch is a distributed, RESTful search and analytics engine
author: requarks.io author: requarks.io
logo: https://static.requarks.io/logo/elasticsearch.svg logo: https://static.requarks.io/logo/elasticsearch.svg
website: https://www.elastic.co/products/elasticsearch website: https://www.elastic.co/products/elasticsearch
isAvailable: false isAvailable: true
props: props:
apiVersion: apiVersion:
type: String type: String
@ -17,29 +17,37 @@ props:
- '6.4' - '6.4'
- '6.3' - '6.3'
default: '6.6' default: '6.6'
host: hosts:
type: String type: String
title: Host(s) title: Host(s)
hint: Comma-separated list of Elasticsearch hosts to connect to hint: Comma-separated list of Elasticsearch hosts to connect to. (including the port)
order: 2 order: 2
user: user:
type: String type: String
title: Username title: Username
hint: (Optional) Username to use if using the security feature from X-Pack
order: 3 order: 3
pass: pass:
type: String type: String
title: Password title: Password
hint: (Optional) Password to use if using the security feature from X-Pack
order: 4 order: 4
sniff: indexName:
type: String
title: Index Name
hint: The index name to use during creation
default: wiki
order: 5
sniffOnStart:
type: Boolean type: Boolean
title: Sniff on start title: Sniff on start
hint: 'Should Wiki.js attempt to detect the rest of the cluster on first connect? (Default: off)' hint: 'Should Wiki.js attempt to detect the rest of the cluster on first connect? (Default: off)'
default: false default: false
order: 5 order: 6
sniffInterval: sniffInterval:
type: Number type: Number
title: Sniff Interval title: Sniff Interval
hint: '0 = disabled, Interval in seconds to check for updated list of nodes in cluster. (Default: 0)' hint: '0 = disabled, Interval in seconds to check for updated list of nodes in cluster. (Default: 0)'
order: 6
default: 0 default: 0
order: 7

View File

@ -1,26 +1,297 @@
const _ = require('lodash')
const elasticsearch = require('elasticsearch')
const { pipeline, Transform } = require('stream')
/* global WIKI */
module.exports = { module.exports = {
activate() { async activate() {
// not used
}, },
deactivate() { async deactivate() {
// not used
}, },
query() { /**
* INIT
*/
async init() {
WIKI.logger.info(`(SEARCH/ELASTICSEARCH) Initializing...`)
this.client = new elasticsearch.Client({
apiVersion: this.config.apiVersion,
hosts: this.config.hosts.split(',').map(_.trim),
httpAuth: (this.config.user.length > 0) ? `${this.config.user}:${this.config.pass}` : null,
sniffOnStart: this.config.sniffOnStart,
sniffInterval: (this.config.sniffInterval > 0) ? this.config.sniffInterval : false
})
// -> Create Search Index
await this.createIndex()
WIKI.logger.info(`(SEARCH/ELASTICSEARCH) Initialization completed.`)
}, },
created() { /**
* Create Index
*/
async createIndex() {
const indexExists = await this.client.indices.exists({ index: this.config.indexName })
if (!indexExists) {
WIKI.logger.info(`(SEARCH/ELASTICSEARCH) Creating index...`)
await this.client.indices.create({
index: this.config.indexName,
body: {
mappings: {
_doc: {
properties: {
suggest: { type: 'completion' },
title: { type: 'text', boost: 4.0 },
description: { type: 'text', boost: 3.0 },
content: { type: 'text', boost: 1.0 },
locale: { type: 'keyword' },
path: { type: 'text' }
}
}
}
}
})
}
}, },
updated() { /**
* QUERY
*
* @param {String} q Query
* @param {Object} opts Additional options
*/
async query(q, opts) {
try {
const results = await this.client.search({
index: this.config.indexName,
body: {
query: {
simple_query_string: {
query: q
}
},
from: 0,
size: 50,
_source: ['title', 'description', 'path', 'locale'],
suggest: {
suggestions: {
text: q,
completion: {
field: 'suggest',
size: 5,
skip_duplicates: true,
fuzzy: true
}
}
}
}
})
return {
results: _.get(results, 'hits.hits', []).map(r => ({
id: r._id,
locale: r._source.locale,
path: r._source.path,
title: r._source.title,
description: r._source.description
})),
suggestions: _.reject(_.get(results, 'suggest.suggestions', []).map(s => _.get(s, 'options[0].text', false)), s => !s),
totalHits: results.hits.total
}
} catch (err) {
WIKI.logger.warn('Search Engine Error:')
WIKI.logger.warn(err)
}
}, },
deleted() { /**
* Build suggest field
*/
buildSuggest(page) {
return _.uniq(_.concat(
page.title.split(' ').map(s => ({
input: s,
weight: 4
})),
page.description.split(' ').map(s => ({
input: s,
weight: 3
})),
page.content.split(' ').map(s => ({
input: s,
weight: 1
}))
))
}, },
renamed() { /**
* CREATE
*
* @param {Object} page Page to create
*/
async created(page) {
await this.client.index({
index: this.config.indexName,
type: '_doc',
id: page.hash,
body: {
suggest: this.buildSuggest(page),
locale: page.localeCode,
path: page.path,
title: page.title,
description: page.description,
content: page.content
},
refresh: true
})
}, },
rebuild() { /**
* UPDATE
*
* @param {Object} page Page to update
*/
async updated(page) {
await this.client.index({
index: this.config.indexName,
type: '_doc',
id: page.hash,
body: {
suggest: this.buildSuggest(page),
locale: page.localeCode,
path: page.path,
title: page.title,
description: page.description,
content: page.content
},
refresh: true
})
},
/**
* DELETE
*
* @param {Object} page Page to delete
*/
async deleted(page) {
await this.client.delete({
index: this.config.indexName,
type: '_doc',
id: page.hash,
refresh: true
})
},
/**
* RENAME
*
* @param {Object} page Page to rename
*/
async renamed(page) {
await this.client.delete({
index: this.config.indexName,
type: '_doc',
id: page.sourceHash,
refresh: true
})
await this.client.index({
index: this.config.indexName,
type: '_doc',
id: page.destinationHash,
body: {
suggest: this.buildSuggest(page),
locale: page.localeCode,
path: page.destinationPath,
title: page.title,
description: page.description,
content: page.content
},
refresh: true
})
},
/**
* REBUILD INDEX
*/
async rebuild() {
WIKI.logger.info(`(SEARCH/ELASTICSEARCH) Rebuilding Index...`)
await this.client.indices.delete({ index: this.config.indexName })
await this.createIndex()
const MAX_INDEXING_BYTES = 10 * Math.pow(2, 20) - Buffer.from('[').byteLength - Buffer.from(']').byteLength // 10 MB
const MAX_INDEXING_COUNT = 1000
const COMMA_BYTES = Buffer.from(',').byteLength
let chunks = []
let bytes = 0
const processDocument = async (cb, doc) => {
try {
if (doc) {
const docBytes = Buffer.from(JSON.stringify(doc)).byteLength
// -> Current batch exceeds size limit, flush
if (docBytes + COMMA_BYTES + bytes >= MAX_INDEXING_BYTES) {
await flushBuffer()
}
if (chunks.length > 0) {
bytes += COMMA_BYTES
}
bytes += docBytes
chunks.push(doc)
// -> Current batch exceeds count limit, flush
if (chunks.length >= MAX_INDEXING_COUNT) {
await flushBuffer()
}
} else {
// -> End of stream, flush
await flushBuffer()
}
cb()
} catch (err) {
cb(err)
}
}
const flushBuffer = async () => {
WIKI.logger.info(`(SEARCH/ELASTICSEARCH) Sending batch of ${chunks.length}...`)
try {
await this.client.bulk({
index: this.config.indexName,
body: _.reduce(chunks, (result, doc) => {
result.push({
index: {
_index: this.config.indexName,
_type: '_doc',
_id: doc.id
}
})
result.push({
suggest: this.buildSuggest(doc),
locale: doc.locale,
path: doc.path,
title: doc.title,
description: doc.description,
content: doc.content
})
return result
}, []),
refresh: true
})
} catch (err) {
WIKI.logger.warn('(SEARCH/ELASTICSEARCH) Failed to send batch to elasticsearch: ', err)
}
chunks.length = 0
bytes = 0
}
await pipeline(
WIKI.models.knex.column({ id: 'hash' }, 'path', { locale: 'localeCode' }, 'title', 'description', 'content').select().from('pages').where({
isPublished: true,
isPrivate: false
}).stream(),
new Transform({
objectMode: true,
transform: async (chunk, enc, cb) => processDocument(cb, chunk),
flush: async (cb) => processDocument(cb)
})
)
WIKI.logger.info(`(SEARCH/ELASTICSEARCH) Index rebuilt successfully.`)
} }
} }