前面有写过用mysql的全文索引来做查询的,可是mysql的全文索引使用的时候各种问题,查询的结果相关度低,速度慢,而且还有一个致命缺点,当查询的关键字的数量多于5个字的时候,查询就会崩溃。
首先安装elasticsearch,我用的版本是7.10.0目前最新版。先上官网下载Download Elasticsearch有windows和linux版,用法一样。我测试用的windows版,线上用的linux版,这里我用linux版的。并且安装对应版本的ik分词器
Download Elasticsearch
然后解压目录:
进入bin目录直接运行 ./elasticsearch
会报错,不允许用root用户来运行,如果你是用root用户的话就没问题,下面新建用户更改文件所属并切换用户:
useradd elasticsearch
chown -R elasticsearch:elasticsearch ./elasticsearch-7.10.0
su elasticsearch
重新进入bin目录运行./elasticsearch
测试是否启动成功:
curl -XGET "http://localhost:9200/"
定义一个indice,相当于mysql的一个表:
我这里是用了三个字段,name,sheng,shi,xian。其中name用了ik分词并且设为text类型设置全文索引,其他的字段让elasticsearch自动设置类型。
curl -XPUT "http://localhost:9200/companysearch?pretty" -H 'Content-Type: application/json' -d '{
"mappings": {
"properties": {
"name": { "type": "text", "analyzer": "ik_max_word", "search_analyzer": "ik_smart" },
"sheng": { "type": "keyword" },
"shi": { "type": "keyword" },
"xian": { "type": "keyword" }
}
}
}'
下一步是把mysql服务器中的数据一次导入elasticsearch中(全量)。
elastic官方提供的nodejs包来导入,用bulk api批量导入快些:
安装依赖:
npm i @elastic/elasticsearch mysql
const fs = require('fs')
const mysql = require('mysql')
const { Client } = require('@elastic/elasticsearch')
const client = new Client({ node: 'http://localhost:9200' })
const connection = mysql.createConnection({
host: '127.0.0.1',
user: 'root',
password: 'root',
database: 'company'
})
connection.connect()
var breakPoint = {
id: 0, //同步的mysql数据库数据id
limit: 5000, //一次查询的数据条数
};
var elasticIndex = 'companysearch'
var isCon = true;
// 初始化断点
try {
fs.accessSync('./breakpoint', fs.constants.R_OK | fs.constants.W_OK);
var breakPointTmp = fs.readFileSync('./breakpoint', { encoding: 'utf-8' })
if (breakPointTmp) {
breakPoint = JSON.parse(breakPointTmp)
}
console.log('读取断点文件成功');
} catch (err) {
console.error('没有断点文件');
fs.appendFileSync('./breakpoint', '{}');
console.log('生成断点文件成功');
}
//监听程序退出
process.on('SIGINT', async () => {
console.log('程序退出')
isCon = false
saveBreakPoint('程序正常退出')
process.exit();
});
//延时函数
function timeout(ms = 1000) {
return new Promise((resolve) => {
setTimeout(resolve, ms);
});
}
async function run() {
// 批量导入数据
while (isCon) {
console.log('新一轮数据导入开始')
//查询mysql数据库语句
var queryDbSql = `SELECT id,name,status,oper_name,regist_capi,start_date,sheng,shi,xian,country_id,phone,email,trade_id,trade FROM company WHERE id > ${breakPoint.id} LIMIT ${breakPoint.limit}`
// console.log(queryDbSql)
let dbData = await queryDb(queryDbSql)
if (dbData) {
// 写入elasticearch(用bulk)
let body = dbData.flatMap(doc => [{ index: { _index: elasticIndex, _id: doc['id'] } }, doc])
const { body: bulkResponse } = await client.bulk({ refresh: true, body })
if (bulkResponse.errors) {
const erroredDocuments = []
// The items array has the same order of the dataset we just indexed.
// The presence of the `error` key indicates that the operation
// that we did for the document has failed.
bulkResponse.items.forEach((action, i) => {
const operation = Object.keys(action)[0]
if (action[operation].error) {
erroredDocuments.push({
// If the status is 429 it means that you can retry the document,
// otherwise it's very likely a mapping error, and you should
// fix the document before to try it again.
status: action[operation].status,
error: action[operation].error,
operation: body[i * 2],
document: body[i * 2 + 1]
})
}
})
console.log(erroredDocuments)
}
// 当查询完毕
// 更新id
breakPoint.id += dbData.length
saveBreakPoint('正常更新断点:' + JSON.stringify(breakPoint))
if (dbData.length < breakPoint.limit) { //数据查询完毕终止查询
isCon = false
connection.end();
}
const { body: count } = await client.count({ index: elasticIndex })
console.log(count)
await timeout()
} else {
console.log('没有数据')
isCon = false
connection.end();
saveBreakPoint('正常更新断点:' + JSON.stringify(breakPoint))
}
}
}
// 查询数据mysql
function queryDb(querySql) {
return new Promise((resolve, reject) => {
connection.query(querySql, (error, results, fields) => {
if (error) {
console.log('[QUERY ERROR] - ', error.message);
saveBreakPoint('数据查询出错')
reject(error)
throw '数据查询出错'
return;
} else {
resolve(results)
}
})
})
}
//记录日志
function log(msg = '') {
let time = new Date()
time += '\n'
time += msg
time += '\n------------------------------\n'
fs.appendFileSync('log.txt', time)
}
//记录断点
function saveBreakPoint(msg = '') {
fs.writeFileSync('./breakpoint', JSON.stringify(breakPoint))
log('保存断点' + msg)
}
run().catch(console.log)
// connection.end();
运行脚本,同步数据
node test.js
最后对接项目中的程序,我用php做演示,官方有各种语言的包:
// 企业库查询(new)
public function companySearchNew()
{
$name = I('name', '', 'trim'); //关键字
$limit = I('limit', 10, 'intval');
if ($limit > 30) {
$limit = 30;
}
$page = I('page', 1, 'intval');
$sheng = I('sheng', '', 'trim');
$shi = I('shi', '', 'trim');
$xian = I('xian', '', 'trim');
$trade_id = I('trade_id', '', 'trim');
$start_date = I('start_date', '', 'trim');
$min_money = I('min_money', 0, 'intval');
$max_money = I('max_money', 0, 'intval');
$client = ClientBuilder::create()->build();
if (!empty($name)) { //如果有关键字
$where = [
'bool' => [
'should' => ['match' => ['name' => $name]]
]
];
}
if (!empty($sheng)) { //省关键字
$where['bool']['must'] = ['match' => ['sheng' => $sheng]];
}
if (!empty($shi)) { //市关键字
$where['bool']['must'] = ['match' => ['shi' => $shi]];
}
if (!empty($xian)) { //县关键字
$where['bool']['must'] = ['match' => ['xian' => $xian]];
}
if (!empty($trade_id)) { //分类
$where['bool']['must'] = ['match' => ['trade_id' => $trade_id]];
}
if (!empty($min_money)) { //最小注册资金
$where['bool']['must'] = ['range' => ['price' => ['gte' => $min_money]]];
}
if (!empty($max_money)) { //最大注册资金
$where['bool']['must'] = ['range' => ['price' => ['lte' => $max_money]]];
}
if (!empty($start_date)) { //成立时间
$where['range'] = [
'start_date' => [
'gte' => $start_date,
'format' => 'yyyy-MM-dd'
]
];
}
$params = [
'size' => $limit,
'from' => $limit * ($page - 1),
'index' => 'companysearch',
'type' => '_doc',
];
if ($where) {
$params['body'] = [
'query' => $where,
'collapse' => ['field' => 'phone.keyword'] //根据手机号去重
// 'sort' => ['id' => ['order' => 'desc']]
];
}
$response = $client->search($params);
$this->ajaxReturn(1, $response);
}