mysql indexing using elasticsearch jdbc - downgoon/hello-world GitHub Wiki

elasticsearch-jdbc 是用JAVA写的,依靠执行SQL语句,把MySQL信息同步到elasticsearch里,以提供索引查询。

体验 elasticsearch-jdbc

mysql 导入 ES 结构图

import es from mysql.png

elasticsearch-jdbc 这个小工具工作原理:从mysql拉数据,装填到ES里面。从mysql拉数据,执行SQL语句,而不是binlog caputre;向ES装填数据,用的是9300端口(TCP RPC端口),而不是9200端口(HTTP REST端口)。

那么从MySQL到ES的,数据模型映射关系上:database -> index; table -> type; schema -> mapping。

另外一个问题是:依靠执行SQL,怎么实现增量同步ES呢?

安装

$ wget http://xbib.org/repository/org/xbib/elasticsearch/importer/elasticsearch-jdbc/2.3.4.0/elasticsearch-jdbc-2.3.4.0-dist.zip
$ unzip elasticsearch-jdbc-2.3.4.0-dist.zip

全量导入ES

#!/bin/sh
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
bin=${DIR}/../bin
lib=${DIR}/../lib

echo '
{
    "type" : "jdbc",
    "jdbc" : {
        "url" : "jdbc:mysql://localhost:3306/test",
        "user" : "root",
        "password" : "123456",
        "sql" : "select *, id as _id from article",
        "elasticsearch" : {
             "cluster" : "elasticsearch",
             "host" : "localhost",
             "port" : 9300
        },
        "index" : "test",
        "type" : "article",
        "type_mapping": {
            "article" : {
                "properties" : {
                    "id" : {
                        "type" : "integer",
                        "index" : "not_analyzed"
                    }
                }
            }
        }
    }
}
' | java \
    -cp "${lib}/*" \
    org.xbib.tools.Runner \
    org.xbib.tools.JDBCImporter

上面的脚本主要是执行了 org.xbib.tools.JDBCImporter ,配置是一个JSON格式。有趣的是Java里面也支持管道,实现类是org.xbib.tools.Runner

这个JSON,主要配置了两个方面:

  • mysql端:数据库的账号密码+SQL语句。比如这里的select *, id as _id from article

  • ES端:ES地址+Index。test->article,其中id字段不纳入索引。

测试搜索


$ curl -XGET 'http://localhost:9200/test/article/_search?pretty' -i

刚才录入的记录全部会返回。如果要搜索作者为tommy的文章呢?

curl -XGET 'http://localhost:9200/test/article/_search?pretty&q=author:tommy' -i
HTTP/1.1 200 OK
Content-Type: application/json; charset=UTF-8
Content-Length: 582

{
  "took" : 6,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 1,
    "max_score" : 0.30685282,
    "hits" : [ {
      "_index" : "test",
      "_type" : "article",
      "_id" : "5",
      "_score" : 0.30685282,
      "_source" : {
        "id" : 5,
        "subject" : "希拉里团队炮轰FBI 参院民主党领袖批其“违法”",
        "author" : "tommy",
        "create_time" : "2016-10-31T17:52:07.000+08:00",
        "update_time" : "2016-10-31T17:52:09.000+08:00"
      }
    } ]
  }
}

增量导入ES


#!/bin/sh
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
bin=${DIR}/../bin
lib=${DIR}/../lib

echo '
{
    "type" : "jdbc",
    "jdbc" : {
        "url" : "jdbc:mysql://localhost:3306/test",
        "user" : "root",
        "password" : "123456",

        "sql" : [ {
            "statement" : "select *, id as _id from article where update_time > ?",
            "parameter" : [ "$metrics.lastexecutionstart" ]
           } ],

       "metrics" : {
           "lastexecutionend" : "2017-02-19T06:01:01.441Z",
           "lastexecutionstart" : "2017-03-20T06:01:01.125Z",
           "counter" : "23"
        },

        "statefile" : "statefile-article.json",
        "schedule" : "0 0-59 0-23 ? * *",

        "elasticsearch" : {
             "cluster" : "elasticsearch",
             "host" : "localhost",
             "port" : 9300
        },
        "index" : "test",
        "type" : "article",
        "type_mapping": {
            "article" : {
                "properties" : {
                    "id" : {
                        "type" : "integer",
                        "index" : "not_analyzed"
                    }
                }
            }
        }
    }
}
' | java \
    -cp "${lib}/*" \
    org.xbib.tools.Runner \
    org.xbib.tools.JDBCImporter 

其中重要配置是:

"sql" : [ {
            "statement" : "select *, id as _id from article where update_time > ?",
            "parameter" : [ "$metrics.lastexecutionstart" ]
           } ],

       "metrics" : {
           "lastexecutionend" : "2017-02-19T06:01:01.441Z",
           "lastexecutionstart" : "2017-03-20T06:01:01.125Z",
           "counter" : "23"
        },

        "statefile" : "statefile-article.json",
        "schedule" : "0 0-59 0-23 ? * *",

前期准备:MySQL + ES

创建数据库及其表数据


DROP TABLE IF EXISTS `article`;
CREATE TABLE `article` (
  `id` mediumint(8) unsigned NOT NULL AUTO_INCREMENT,
  `subject` varchar(150) NOT NULL,
  `author` varchar(15) DEFAULT NULL,
  `create_time` timestamp NULL DEFAULT NULL,
  `update_time` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8;


# 数据
INSERT INTO `article` VALUES ('1', '"闺蜜"崔顺实被韩检方传唤 韩总统府促彻查真相', 'jam', '2016-10-31 17:49:21', '2016-10-31 17:50:21');
INSERT INTO `article` VALUES ('2', '韩举行"护国训练" 青瓦台:决不许国家安全出问题', 'jam00', '2016-10-31 17:50:39', '2016-10-31 17:50:51');
INSERT INTO `article` VALUES ('3', '媒体称FBI已经取得搜查令 检视希拉里电邮', 'tomi', '2016-10-31 17:51:03', '2016-10-31 17:51:08');
INSERT INTO `article` VALUES ('4', '村上春树获安徒生奖 演讲中谈及欧洲排外问题', 'jason', '2016-10-31 17:51:38', '2016-10-31 17:51:41');
INSERT INTO `article` VALUES ('5', '希拉里团队炮轰FBI 参院民主党领袖批其“违法”', 'tommy', '2016-10-31 17:52:07', '2016-10-31 17:52:09');

其中 update_time 字段,设置了一个自动更新的特性 ON UPDATE CURRENT_TIMESTAMP

update_time timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP

安装配套的ES

JDBC Importer version Elasticsearch version
2.3.4.0 2.3.4

2.3.4版本下载地址: https://www.elastic.co/downloads/past-releases/elasticsearch-2-3-4 源代码地址:https://github.com/elastic/elasticsearch

  • 安装
$wget https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.3.4/elasticsearch-2.3.4.tar.gz
$ tar zxvf elasticsearch-2.3.4.tar.gz
  • 运行
$ bin/elasticsearch
  • 测试
$ curl -X GET http://localhost:9200/
  • 建索引
$ curl -XPUT 'http://localhost:9200/twitter/user/kimchy?pretty' -H 'Content-Type: application/json' -d '{ "name" : "Shay Banon" }'

这条指令的语意是:在 twitter 这个索引集里面,创建一个user对象,_id=kimchyname=Shay Banon 。如果类比关系型,那么 twitter 是数据库名字,user 是表名字,kimchy 是记录ID。

字段_id为 elasticsearch 保留字。

  • 查询:精确查询
$ curl -XGET 'http://localhost:9200/twitter/user/kimchy?pretty=true'

这条指令的语意是:查询user对象中,_id为kimchy的对象。

  • 搜索:模糊查询
$ curl -XGET 'http://localhost:9200/twitter/user/_search?q=name:Shay&pretty=true' -i
HTTP/1.1 200 OK
Content-Type: application/json; charset=UTF-8
Content-Length: 371

{
  "took" : 6,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 1,
    "max_score" : 0.19178301,
    "hits" : [ {
      "_index" : "twitter",
      "_type" : "user",
      "_id" : "kimchy",
      "_score" : 0.19178301,
      "_source" : {
        "name" : "Shay Banon"
      }
    } ]
  }
}

这条指令的关注点是:_search?q=name:Shay , 语义是名字为Shay的,但是这是一个搜索,而不是精确查询。

顺便说一下,ES查询表达式,还可以用JSON描述:

$ curl -XGET 'http://localhost:9200/twitter/user/_search?pretty=true' -H 'Content-Type: application/json' -d '
{
    "query" : {
        "match" : { "name": "Shay" }
    }
}'

思考&启发

以往存储都以关系型(比如MySQL)为基石。做个东西,需要写RESTful API、Dao和数据库运维管理等。遇到模糊查询的,MySQL还不是太方便处理。有没有想过,有些应用(比方说普通的后台管理),能不能直接用ES作为存储,这样连RESTful API 都不用写,而且还具备模糊查询的功能。而且,ES的数据可以导出到MySQL,甚至也有人做SQL on elasticsearch 的开源。同时,ES的接口是规范化的,那么这样前端JS就应该有对接ES的标准化库 https://www.elastic.co/blog/client-for-node-js-and-the-browser

DB-Engines: 2016年10月份全球数据库排名

数据库排名.png

Apache Calcite: SQL on Anything

Calcite的目标是“one size fits all(一种方案适应所有需求场景),希望能为不同计算平台和数据源提供统一的查询引擎(Facebook支持用SQL语句查询机器的内存、CPU等),以类似传统数据库的访问方式(SQL和高级查询优化)来访问Hadoop上的数据。

参考资料