生产环境实践 - oceanbase/canal GitHub Wiki
部署
Canal Deployer部署
服务端推荐部署方式: Canal Admin + Canal Deployer集群 + MQ(Kafka)
Canal的Deployer模块对应了Canal的服务端,提供了单机部署和集群部署两种模式,在生产实践中推荐使用集群部署。Canal Deployer会保证在一个Zookeeper集群中只且只有一个Server在实际执行,并且同一个Canal Sever上同一个Canal Instance有且只有一个实例在执行。
集群部署时,Zookeeper是必需项,Canal Admin为可选项,在生产实践中推荐使用Canal Admin对集群进行运维管理。当使用Canal Admin时,Canal Deployer服务本身只有conf/canal_local.properties
中对应的配置生效,其他的配置项全部交由Canal Admin通过集群主配置管理。
Canal Deployer在对日志完成转化处理后,会将得到的Canal Entry放入内存中的循环队列供消费组件消费,目前主要提供了TCP和MQ两种消费方式,在生产实践中推荐使用MQ模式,便于控制流量以及问题排查。
整体的操作流程可以参考文档 部署canal.deployer(OB作为数据源)#通过canal-admin部署
Canal Client Adapter的部署
Canal Adapter同样支持集群部署,也一样是通过zookeeper保证有且只有一个实例在运行。它的消费位点管理跟使用的模式有关:
- TCP模式,adapter直接通过TCP连接到Canal服务端,消费的位置信息即是服务端管理的消费位点。
- MQ模式,adapter通过连接MQ获取Canal Entry而不跟Canal服务端直接交互,其消费的位置由它和MQ自己管理。
当使用Kafka时,可以通过删除原有offset并将kafka.auto.offset.reset
设为earliest
的方式从头开始重新消费。
Canal Client Adpater的ACK与异常处理
Canal Adapter在接收到信息时,会先执行写操作然后再ACK。当出现异常时,程序会跳出对当前batch消息的处理,在设置的重试次数内进行重试,达到最大重试次数后,有两种处理逻辑,一种是直接进行ACK并继续执行后续逻辑,另一种是中断操作并退出连接,用户可以使用terminateOnException
配置来选择使用哪种处理方式。需要注意的是,默认情况下application.yml
中的retries为0,terminateOnException默认为false,也就是不进行重试,且出错时不停止服务,而是打出一个日志然后继续执行。
在生产实践中,重试次数一般并不推荐给一个较大的值,因为写入的异常通常无法通过重试解决,当重试次数很大时,将会造成写入逻辑被阻塞。当异常出现时,通常会继续执行后续逻辑,然后由人工及时介入进行数据补偿工作。
数据的完整性与幂等性
完整性
Canal的数据完整性主要靠维护消费位点来保证。
- 在服务端,Canal Deployer维护消费位点并定期持久化。当发生重启时,Canal Deployer会从记录的消费位点开始重新拉取增量日志。
- 在客户端,如Kafka、Canal Adapter,通过在写入完成后commit,保证数据消费完成后才更新消费位点。
幂等性
Canal的消息幂等性是由客户端来保证。
Canal Adapter RDB维护幂等性的逻辑主要是:借助主键和唯一键实现增量日志的可重复消费:
-
DML类型的消息默认是可以重复执行的。在处理DML类型的CanalEntry时,Adapter RDB会将每条消息转化为一条SQL语句,该语句会携带主键或唯一键作为查询条件,因此对于DELETE和UPDATE操作,消息重放不会引起异常。对于INSERT操作,当记录被重复消费执行时,会产生主键或唯一键冲突,此时会根据skipDupException配置项的值决定是否忽略该异常继续执行,该配置默认值为true,也就是忽略重复插入的异常。
-
DDL类型的消息是不能重复执行的。由于DDL类型的CanalEntry引起异常时,很难去判断该异常是否在预期内,因此Adapter RDB不会对DDL的异常做特殊处理,此时将会进入前一小节所述的异常处理逻辑。当使用默认逻辑时,程序会对整个batch ack然后继续处理后续CanalEntry,此时与抛出异常的DDL在同一个batch中的消息中,处于该DDL之后的消息将会在执行前被丢弃,造成数据的丢失。因此,对于DDL引起的异常,通常需要人工介入处理。
在生产实践中,DDL需要谨慎处理。一种做法是在Canal Server端直接过滤掉DDL,当需要时人工执行DDL操作;或者在增量开始前,确保源端和目标端的表结构一致。需要重启或回退起始位点,则需要确保DDL不被重复执行。
位点管理
Canal Deployer的位点管理
Canal Deployer包含两个跟位点控制有关的组件:logPositionManager用于存放解析位点,metaManager用于存放消费位点。
需要注意,这里的消费位点对应的消费者,是针对Canal Deployer而言的。当使用TCP时,消费者是通过TCP端口连接进行消费的客户端,消费位点记录的是客户端最新ACK的位点;当使用MQ时,消费者即为MQ,消费位点记录的是MQ最新ACK的位点,也就是写入MQ成功的位点。
Canal Deployer主要通过Spring XML文件来创建数据同步通路,当使用集群部署时,对应的配置文件为 conf/spring/ob-default-instance.xml。可以看到它的两个位点管理器的实现:
- logPositionManager:组合类型,第一层为内存,第二层为metaManager
- metaManager:使用zookeeper存放位点信息。路径为
/otter/canal/destinations/${destination}/1001/cursor
该配置下的启动时和运行中两种情况:
- 启动时,起始时间戳优先级:Zookeeper存储位点(毫秒时间戳) > instance.properties配置(秒级时间戳) >
0
。 - 运行中,解析位点使用logPositionManager第一层,即存放在内存;消费位点存放在metaManager,它会定期将消费位点写入zookeeper。
起点调整
对于需要调整起点的情况,如想起点回退,有两种实现方式:
- 直接修改Zookeeper中的位点信息,将timestamp字段改为需要的值。
- 确保Zookeeper中没有位点信息,然后修改instance.properties中的
canal.instance.oceanbase.startTimestamp
操作顺序:先停止Canal Deployer服务,然后做出上述修改,完成后再启动集群,即可完成起始时间的调整。
Canal Adapter的位点管理
TCP模式下,Canal Adapter的消费位点由Canal Deployer维护;MQ模式下,Canal Adapter消费的偏移量由MQ记录,Canal Adapter通过先执行修改再更新偏移量,保证消费数据的完整性。
并发
Canal Deployer的并发处理
在canal整体性能优化中,Canal Deployer的parse模块引入了基于ringbuffer实现的MultiStageCoprocessor,实现了解析过程的并行化。用户可以通过canal.properties中的canal.instance.parser.parallel
字段来控制是否使用并行处理。
Canal Adpater RDB的并发处理
当rdb的表映射文件中设置concurrent为true时,表示将开启并发写入。此处的消息逻辑见代码片段,具体的做法是创建了n个List用于存放待写入数据,每一个batch的数据传入时,会根据其中每条记录的主键进行哈希,将该条数据放入其中一个List,分配完成后,启动包含n个线程的线程池并发执行写入。此处的n默认为3。
由于是通过主键进行哈希来分开执行写入,所以当开启并发时,不同的数据行的写入顺序可能跟源端不同。如果要求顺序一致,该项建议关闭。
问题排查
日志信息
排查日志时,可以从数据流向上的源头开始向下游逐层排查。以 Deployer集群 + Kafka + Adapter RDB 的部署为例:
- Canal Server日志,位置在Deployer的
logs/canal/canal.log
- Canal Instance日志,位置在Deployer的
logs/${destination}/${destination}.log
- Zookeeper位点信息,即Kafka写入成功的位点记录,路径为
/otter/canal/destinations/${destination}/1001/cursor
- Kafka记录,可以借助KafkaTool等工具查看Message和Adapter消费到的offset
- Adapter日志,位置在Adapter的
logs/adapter/adapter.log
这样通过逐层排查,首先确定出错的位置在哪一层,再根据具体的报错信息锁定问题代码。其中,Canal Deployer组件的处理逻辑主要在Canal Instance层,因此当服务端出现异常时,该层需要着重排查。
LogProxyConnection测试
LogProxyConnection实际是使用oblogclient连接oblogproxy来获取日志的,当连接动作已经正确发起但却无法正常接收数据时,可以使用oblogclient直连oblogproxy进行验证,来确定是否是canal的问题,测试代码见测试文件。