用elasticsearch优化千万数据全文索引

  数据库
512 views

前面有写过用mysql的全文索引来做查询的,可是mysql的全文索引使用的时候各种问题,查询的结果相关度低,速度慢,而且还有一个致命缺点,当查询的关键字的数量多于5个字的时候,查询就会崩溃。


首先安装elasticsearch,我用的版本是7.10.0目前最新版。先上官网下载Download Elasticsearch有windows和linux版,用法一样。我测试用的windows版,线上用的linux版,这里我用linux版的。并且安装对应版本的ik分词器
Download Elasticsearchelasticsearch

然后解压目录:
elasticunzip
进入bin目录直接运行 ./elasticsearch会报错,不允许用root用户来运行,如果你是用root用户的话就没问题,下面新建用户更改文件所属并切换用户:

useradd elasticsearch
chown -R elasticsearch:elasticsearch ./elasticsearch-7.10.0
su elasticsearch

重新进入bin目录运行./elasticsearch
测试是否启动成功:

curl -XGET "http://localhost:9200/"

startelastic
定义一个indice,相当于mysql的一个表:
我这里是用了三个字段,name,sheng,shi,xian。其中name用了ik分词并且设为text类型设置全文索引,其他的字段让elasticsearch自动设置类型。

curl -XPUT "http://localhost:9200/companysearch?pretty" -H 'Content-Type: application/json' -d '{
    "mappings": {
          "properties": {
                  "name": { "type": "text",        "analyzer": "ik_max_word",        "search_analyzer": "ik_smart"      },
                  "sheng": { "type": "keyword" },
                  "shi": { "type": "keyword" },
                  "xian": { "type": "keyword" }
          }
    }
}'

下一步是把mysql服务器中的数据一次导入elasticsearch中(全量)。
elastic官方提供的nodejs包来导入,用bulk api批量导入快些:
安装依赖:

npm i @elastic/elasticsearch mysql
const fs = require('fs')
const mysql = require('mysql')

const { Client } = require('@elastic/elasticsearch')
const client = new Client({ node: 'http://localhost:9200' })

const connection = mysql.createConnection({
    host: '127.0.0.1',
    user: 'root',
    password: 'root',
    database: 'company'
})
connection.connect()

var breakPoint = {
    id: 0, //同步的mysql数据库数据id
    limit: 5000, //一次查询的数据条数
};

var elasticIndex = 'companysearch'
var isCon = true;

// 初始化断点
try {
    fs.accessSync('./breakpoint', fs.constants.R_OK | fs.constants.W_OK);
    var breakPointTmp = fs.readFileSync('./breakpoint', { encoding: 'utf-8' })
    if (breakPointTmp) {
        breakPoint = JSON.parse(breakPointTmp)
    }
    console.log('读取断点文件成功');
} catch (err) {
    console.error('没有断点文件');
    fs.appendFileSync('./breakpoint', '{}');
    console.log('生成断点文件成功');
}

//监听程序退出
process.on('SIGINT', async () => {
    console.log('程序退出')
    isCon = false
    saveBreakPoint('程序正常退出')
    process.exit();
});

//延时函数
function timeout(ms = 1000) {
    return new Promise((resolve) => {
        setTimeout(resolve, ms);
    });
}

async function run() {
    // 批量导入数据
    while (isCon) {
        console.log('新一轮数据导入开始')

        //查询mysql数据库语句
        var queryDbSql = `SELECT id,name,status,oper_name,regist_capi,start_date,sheng,shi,xian,country_id,phone,email,trade_id,trade FROM company WHERE id > ${breakPoint.id} LIMIT ${breakPoint.limit}`
        // console.log(queryDbSql)
        let dbData = await queryDb(queryDbSql)

        if (dbData) {
            // 写入elasticearch(用bulk)
            let body = dbData.flatMap(doc => [{ index: { _index: elasticIndex, _id: doc['id'] } }, doc])
            const { body: bulkResponse } = await client.bulk({ refresh: true, body })

            if (bulkResponse.errors) {
                const erroredDocuments = []
                // The items array has the same order of the dataset we just indexed.
                // The presence of the `error` key indicates that the operation
                // that we did for the document has failed.
                bulkResponse.items.forEach((action, i) => {
                    const operation = Object.keys(action)[0]
                    if (action[operation].error) {
                        erroredDocuments.push({
                            // If the status is 429 it means that you can retry the document,
                            // otherwise it's very likely a mapping error, and you should
                            // fix the document before to try it again.
                            status: action[operation].status,
                            error: action[operation].error,
                            operation: body[i * 2],
                            document: body[i * 2 + 1]
                        })
                    }
                })
                console.log(erroredDocuments)
            }
            // 当查询完毕
            // 更新id
            breakPoint.id += dbData.length
            saveBreakPoint('正常更新断点:' + JSON.stringify(breakPoint))
            if (dbData.length < breakPoint.limit) { //数据查询完毕终止查询
                isCon = false
                connection.end();
            }

            const { body: count } = await client.count({ index: elasticIndex })
            console.log(count)
            await timeout()
        } else {
            console.log('没有数据')
            isCon = false
            connection.end();
            saveBreakPoint('正常更新断点:' + JSON.stringify(breakPoint))
        }
    }
}

// 查询数据mysql
function queryDb(querySql) {
    return new Promise((resolve, reject) => {
        connection.query(querySql, (error, results, fields) => {
            if (error) {
                console.log('[QUERY ERROR] - ', error.message);
                saveBreakPoint('数据查询出错')
                reject(error)
                throw '数据查询出错'
                return;
            } else {
                resolve(results)
            }
        })
    })
}

//记录日志
function log(msg = '') {
    let time = new Date()
    time += '\n'
    time += msg
    time += '\n------------------------------\n'
    fs.appendFileSync('log.txt', time)
}

//记录断点
function saveBreakPoint(msg = '') {
    fs.writeFileSync('./breakpoint', JSON.stringify(breakPoint))
    log('保存断点' + msg)
}

run().catch(console.log)

// connection.end();

运行脚本,同步数据

node test.js

最后对接项目中的程序,我用php做演示,官方有各种语言的包:

// 企业库查询(new)
    public function companySearchNew()
    {
        $name = I('name', '', 'trim'); //关键字
        $limit = I('limit', 10, 'intval');
        if ($limit > 30) {
            $limit = 30;
        }
        $page = I('page', 1, 'intval');
        $sheng = I('sheng', '', 'trim');
        $shi = I('shi', '', 'trim');
        $xian = I('xian', '', 'trim');
        $trade_id = I('trade_id', '', 'trim');
        $start_date = I('start_date', '', 'trim');
        $min_money = I('min_money', 0, 'intval');
        $max_money = I('max_money', 0, 'intval');

        $client = ClientBuilder::create()->build();

        if (!empty($name)) { //如果有关键字
            $where = [
                'bool' => [
                    'should' => ['match' => ['name' => $name]]
                ]
            ];
        }

        if (!empty($sheng)) { //省关键字
            $where['bool']['must'] = ['match' => ['sheng' => $sheng]];
        }
        if (!empty($shi)) { //市关键字
            $where['bool']['must'] = ['match' => ['shi' => $shi]];
        }
        if (!empty($xian)) { //县关键字
            $where['bool']['must'] = ['match' => ['xian' => $xian]];
        }
        if (!empty($trade_id)) { //分类
            $where['bool']['must'] = ['match' => ['trade_id' => $trade_id]];
        }
        if (!empty($min_money)) { //最小注册资金
            $where['bool']['must'] = ['range' => ['price' => ['gte' => $min_money]]];
        }
        if (!empty($max_money)) { //最大注册资金
            $where['bool']['must'] = ['range' => ['price' => ['lte' => $max_money]]];
        }
        if (!empty($start_date)) { //成立时间
            $where['range'] = [
                'start_date' => [
                    'gte' => $start_date,
                    'format' => 'yyyy-MM-dd'
                ]
            ];
        }

        $params = [
            'size' => $limit,
            'from' => $limit * ($page - 1),
            'index' => 'companysearch',
            'type' => '_doc',
        ];

        if ($where) {
            $params['body'] = [
                'query' => $where,
                'collapse' => ['field' => 'phone.keyword'] //根据手机号去重
                // 'sort' => ['id' => ['order' => 'desc']]
            ];
        }

        $response = $client->search($params);
        $this->ajaxReturn(1, $response);
    }

LEAVE A COMMENT