wikijs-fork/server/modules/search/aws/engine.js

371 lines
10 KiB
JavaScript
Raw Permalink Normal View History

2019-03-13 06:52:08 +00:00
const _ = require('lodash')
const AWS = require('aws-sdk')
const stream = require('stream')
const Promise = require('bluebird')
const pipeline = Promise.promisify(stream.pipeline)
2018-09-01 03:42:14 +00:00
2019-03-18 01:52:16 +00:00
/* global WIKI */
2019-03-13 06:52:08 +00:00
module.exports = {
async activate() {
// not used
2018-09-01 03:42:14 +00:00
},
2019-03-13 06:52:08 +00:00
async deactivate() {
// not used
2018-09-01 03:42:14 +00:00
},
2019-03-13 06:52:08 +00:00
/**
* INIT
*/
async init() {
WIKI.logger.info(`(SEARCH/AWS) Initializing...`)
this.client = new AWS.CloudSearch({
apiVersion: '2013-01-01',
accessKeyId: this.config.accessKeyId,
secretAccessKey: this.config.secretAccessKey,
region: this.config.region
})
2019-03-16 22:39:31 +00:00
this.clientDomain = new AWS.CloudSearchDomain({
apiVersion: '2013-01-01',
endpoint: this.config.endpoint,
accessKeyId: this.config.accessKeyId,
secretAccessKey: this.config.secretAccessKey,
region: this.config.region
})
2018-09-01 03:42:14 +00:00
2019-03-13 06:52:08 +00:00
let rebuildIndex = false
2018-09-01 03:42:14 +00:00
2019-03-13 06:52:08 +00:00
// -> Define Analysis Schemes
const schemes = await this.client.describeAnalysisSchemes({
DomainName: this.config.domain,
AnalysisSchemeNames: ['default_anlscheme']
}).promise()
if (_.get(schemes, 'AnalysisSchemes', []).length < 1) {
WIKI.logger.info(`(SEARCH/AWS) Defining Analysis Scheme...`)
await this.client.defineAnalysisScheme({
DomainName: this.config.domain,
AnalysisScheme: {
AnalysisSchemeLanguage: this.config.AnalysisSchemeLang,
AnalysisSchemeName: 'default_anlscheme'
}
}).promise()
rebuildIndex = true
}
2018-09-01 03:42:14 +00:00
2019-03-13 06:52:08 +00:00
// -> Define Index Fields
const fields = await this.client.describeIndexFields({
DomainName: this.config.domain
}).promise()
if (_.get(fields, 'IndexFields', []).length < 1) {
WIKI.logger.info(`(SEARCH/AWS) Defining Index Fields...`)
await this.client.defineIndexField({
DomainName: this.config.domain,
IndexField: {
IndexFieldName: 'id',
IndexFieldType: 'literal'
}
}).promise()
await this.client.defineIndexField({
DomainName: this.config.domain,
IndexField: {
IndexFieldName: 'path',
IndexFieldType: 'literal'
}
}).promise()
await this.client.defineIndexField({
DomainName: this.config.domain,
IndexField: {
IndexFieldName: 'locale',
IndexFieldType: 'literal'
}
}).promise()
await this.client.defineIndexField({
DomainName: this.config.domain,
IndexField: {
IndexFieldName: 'title',
IndexFieldType: 'text',
TextOptions: {
ReturnEnabled: true,
AnalysisScheme: 'default_anlscheme'
}
}
}).promise()
await this.client.defineIndexField({
DomainName: this.config.domain,
IndexField: {
IndexFieldName: 'description',
IndexFieldType: 'text',
TextOptions: {
ReturnEnabled: true,
AnalysisScheme: 'default_anlscheme'
}
}
}).promise()
await this.client.defineIndexField({
DomainName: this.config.domain,
IndexField: {
IndexFieldName: 'content',
IndexFieldType: 'text',
TextOptions: {
ReturnEnabled: false,
AnalysisScheme: 'default_anlscheme'
}
}
}).promise()
rebuildIndex = true
}
2018-09-01 03:42:14 +00:00
2019-03-18 01:52:16 +00:00
// -> Define suggester
2019-03-13 06:52:08 +00:00
const suggesters = await this.client.describeSuggesters({
DomainName: this.config.domain,
SuggesterNames: ['default_suggester']
}).promise()
2019-03-18 01:52:16 +00:00
if (_.get(suggesters, 'Suggesters', []).length < 1) {
2019-03-13 06:52:08 +00:00
WIKI.logger.info(`(SEARCH/AWS) Defining Suggester...`)
await this.client.defineSuggester({
DomainName: this.config.domain,
Suggester: {
SuggesterName: 'default_suggester',
DocumentSuggesterOptions: {
SourceField: 'title',
FuzzyMatching: 'high'
}
}
}).promise()
rebuildIndex = true
}
2018-09-01 03:42:14 +00:00
2019-03-13 06:52:08 +00:00
// -> Rebuild Index
if (rebuildIndex) {
WIKI.logger.info(`(SEARCH/AWS) Requesting Index Rebuild...`)
await this.client.indexDocuments({
DomainName: this.config.domain
}).promise()
}
2018-09-01 03:42:14 +00:00
2019-03-13 06:52:08 +00:00
WIKI.logger.info(`(SEARCH/AWS) Initialization completed.`)
},
/**
* QUERY
*
* @param {String} q Query
* @param {Object} opts Additional options
*/
async query(q, opts) {
try {
2019-03-16 22:39:31 +00:00
let suggestions = []
const results = await this.clientDomain.search({
query: q,
partial: true,
size: 50
}).promise()
if (results.hits.found < 5) {
const suggestResults = await this.clientDomain.suggest({
query: q,
suggester: 'default_suggester',
size: 5
}).promise()
suggestions = suggestResults.suggest.suggestions.map(s => s.suggestion)
}
2019-03-13 06:52:08 +00:00
return {
2019-03-16 22:39:31 +00:00
results: _.map(results.hits.hit, r => ({
id: r.id,
path: _.head(r.fields.path),
locale: _.head(r.fields.locale),
title: _.head(r.fields.title) || '',
description: _.head(r.fields.description) || ''
2019-03-16 22:39:31 +00:00
})),
suggestions: suggestions,
totalHits: results.hits.found
2019-03-13 06:52:08 +00:00
}
} catch (err) {
WIKI.logger.warn('Search Engine Error:')
WIKI.logger.warn(err)
}
},
/**
* CREATE
*
* @param {Object} page Page to create
*/
async created(page) {
2019-03-16 22:39:31 +00:00
await this.clientDomain.uploadDocuments({
contentType: 'application/json',
documents: JSON.stringify([
{
type: 'add',
id: page.hash,
fields: {
locale: page.localeCode,
path: page.path,
title: page.title,
description: page.description,
content: page.safeContent
2019-03-16 22:39:31 +00:00
}
}
])
}).promise()
2019-03-13 06:52:08 +00:00
},
/**
* UPDATE
*
* @param {Object} page Page to update
*/
async updated(page) {
2019-03-16 22:39:31 +00:00
await this.clientDomain.uploadDocuments({
contentType: 'application/json',
documents: JSON.stringify([
{
type: 'add',
id: page.hash,
fields: {
locale: page.localeCode,
path: page.path,
title: page.title,
description: page.description,
content: page.safeContent
2019-03-16 22:39:31 +00:00
}
}
])
}).promise()
2019-03-13 06:52:08 +00:00
},
/**
* DELETE
*
* @param {Object} page Page to delete
*/
async deleted(page) {
2019-03-16 22:39:31 +00:00
await this.clientDomain.uploadDocuments({
contentType: 'application/json',
documents: JSON.stringify([
{
type: 'delete',
id: page.hash
}
])
}).promise()
2019-03-13 06:52:08 +00:00
},
/**
* RENAME
*
* @param {Object} page Page to rename
*/
async renamed(page) {
2019-03-16 22:39:31 +00:00
await this.clientDomain.uploadDocuments({
contentType: 'application/json',
documents: JSON.stringify([
{
type: 'delete',
2019-10-13 23:59:50 +00:00
id: page.hash
2019-03-16 22:39:31 +00:00
}
])
}).promise()
await this.clientDomain.uploadDocuments({
contentType: 'application/json',
documents: JSON.stringify([
{
type: 'add',
id: page.destinationHash,
fields: {
2019-10-13 23:59:50 +00:00
locale: page.destinationLocaleCode,
2019-03-16 22:39:31 +00:00
path: page.destinationPath,
title: page.title,
description: page.description,
content: page.safeContent
2019-03-16 22:39:31 +00:00
}
}
])
}).promise()
2019-03-13 06:52:08 +00:00
},
/**
* REBUILD INDEX
*/
async rebuild() {
2019-03-16 22:39:31 +00:00
WIKI.logger.info(`(SEARCH/AWS) Rebuilding Index...`)
const MAX_DOCUMENT_BYTES = Math.pow(2, 20)
const MAX_INDEXING_BYTES = 5 * Math.pow(2, 20) - Buffer.from('[').byteLength - Buffer.from(']').byteLength
const MAX_INDEXING_COUNT = 1000
const COMMA_BYTES = Buffer.from(',').byteLength
let chunks = []
let bytes = 0
const processDocument = async (cb, doc) => {
try {
if (doc) {
const docBytes = Buffer.from(JSON.stringify(doc)).byteLength
// -> Document too large
if (docBytes >= MAX_DOCUMENT_BYTES) {
throw new Error('Document exceeds maximum size allowed by AWS CloudSearch.')
}
// -> Current batch exceeds size hard limit, flush
if (docBytes + COMMA_BYTES + bytes >= MAX_INDEXING_BYTES) {
await flushBuffer()
}
if (chunks.length > 0) {
bytes += COMMA_BYTES
}
bytes += docBytes
chunks.push(doc)
// -> Current batch exceeds count soft limit, flush
if (chunks.length >= MAX_INDEXING_COUNT) {
await flushBuffer()
}
} else {
// -> End of stream, flush
await flushBuffer()
}
cb()
} catch (err) {
cb(err)
}
}
const flushBuffer = async () => {
WIKI.logger.info(`(SEARCH/AWS) Sending batch of ${chunks.length}...`)
try {
2019-03-18 01:52:16 +00:00
await this.clientDomain.uploadDocuments({
2019-03-16 22:39:31 +00:00
contentType: 'application/json',
documents: JSON.stringify(_.map(chunks, doc => ({
type: 'add',
id: doc.id,
fields: {
locale: doc.locale,
path: doc.path,
title: doc.title,
description: doc.description,
content: WIKI.models.pages.cleanHTML(doc.render)
2019-03-16 22:39:31 +00:00
}
})))
}).promise()
} catch (err) {
WIKI.logger.warn('(SEARCH/AWS) Failed to send batch to AWS CloudSearch: ', err)
}
chunks.length = 0
bytes = 0
}
2019-03-13 06:52:08 +00:00
await pipeline(
WIKI.models.knex.column({ id: 'hash' }, 'path', { locale: 'localeCode' }, 'title', 'description', 'render').select().from('pages').where({
2019-03-13 06:52:08 +00:00
isPublished: true,
isPrivate: false
}).stream(),
new stream.Transform({
2019-03-16 22:39:31 +00:00
objectMode: true,
2019-03-18 01:52:16 +00:00
transform: async (chunk, enc, cb) => processDocument(cb, chunk),
flush: async (cb) => processDocument(cb)
2019-03-16 22:39:31 +00:00
})
2019-03-13 06:52:08 +00:00
)
2019-03-16 22:39:31 +00:00
WIKI.logger.info(`(SEARCH/AWS) Requesting Index Rebuild...`)
await this.client.indexDocuments({
DomainName: this.config.domain
}).promise()
WIKI.logger.info(`(SEARCH/AWS) Index rebuilt successfully.`)
2018-09-01 03:42:14 +00:00
}
}