Canal对接Flink - oceanbase/canal GitHub Wiki

简介

Flink是新一代流批统一的计算引擎,它支持从多种存储容器中读取数据,进行处理后,再写入到多种存储容器中。Flink Connector既是完成上述数据读取和写入的连接器组件,连接了 Flink 计算引擎跟外界存储系统。

目前Flink已经支持了多种容器数据格式,其中就包括Canal JSON,即CanalEntry格式。因此,可以使用Flink作为Canal Deployer的消费端,将解析得到的Canal Entry输出到多种存储,或者通过Flink SQL进行表聚合等操作。

部署

Flink在以Canal JSON作为数据源时,支持文件和Kafka两种模式。此处源端以Kafka为例,目标端以MySQL为例,Flink版本使用1.13.5。

准备

部署Canal Deployer,并使用Kafka模式。参考 https://github.com/oceanbase/canal/wiki#canal-deployer

Flink部署

1.安装Flink

下载并解压到工作目录 https://flink.apache.org/downloads.html#apache-flink-1135

tar -xzf flink-1.13.5-bin-scala_2.11.tgz
cd flink-1.13.5-bin-scala_2.11

2.下载依赖包:

Connector

MySQL Driver https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/

3. 启动Flink

./bin/start-cluster.sh

4. 创建任务

打开SQL Client

./bin/sql-client.sh

参考文档,设置源端Kafka和目标端MySQL

CREATE TABLE kafka_user (
  id int primary key,
  name varchar
) WITH (
 'connector' = 'kafka',
 'topic' = 'user',
 'properties.bootstrap.servers' = '127.0.0.1:9092',
 'properties.group.id' = 'g1',
 'format' = 'canal-json' ,
 'scan.startup.mode' = 'latest-offset'
);

CREATE TABLE mysql_user
(
  id int primary key,
  name varchar
)
WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://127.0.0.1:3306/flink',
    'username'= 'root',
    'password'= '123456',
    'table-name' = 'user'
);

数据源配置完成后,可以通过SQL直接创建一个任务,如

insert into mysql_user select * from kafka_user;

源端的数据变动可以通过如下命令以表格的形式展示

SET sql-client.execution.result-mode=tableau;
select * from kafka_user;

执行日志可以查看log/flink-root-taskexecutor-*.log