type
status
date
slug
summary
tags
category
icon
password

1、Canal简介

1.1 Canal 是什么

Canal 是阿里旗下的一款开源框架,主要用途是解析 MySQL 数据库的 binlog 日志,提供增量数据订阅和消费
Canal 的工作原理比较简单:
notion image
  1. canal 模拟 mysql slave 的交互协议,伪装自己为 mysql slave,向 mysql master 发送 dump 协议。
  1. mysql master 收到 dump 请求,开始推送 binary log 给 slave(也就是 canal)。
  1. canal 解析 binary log 对象(原始为 byte 流)。

1.2 Canal 的部署

Canal 有三种部署方式:
1、canal-embeded 部署。
notion image
2、canal-server 独立部署,客户端引入 canal-client 消费。
notion image
3、canal-server 独立部署,内部消费并投递到其他中间件,客户端再消费中间件里面的数据。
notion image
生产推荐使用第三种独立部署的方式。如果你能 hold 住 Netty 等高并发和分布式技术,也可以考虑使用前两种部署方式。
独立部署的教程参考官方文档

1.3 Canal的HA机制

canal 的 HA 机制分为两部分,canal server 和 canal client 分别有对应的 HA 实现
  • canal server: 为了减少对 mysql dump 的请求,不同 server 上的 instance 要求同一时间只能有一个处于 running,其他的处于 standby 状态.
  • canal client: 为了保证有序性,一份 instance 同一时间只能由一个 canal client 进行 get/ack/rollback 操作,否则客户端接收无法保证有序。
整个 HA 机制的控制主要是依赖了zookeeper 的几个特性: watcherEPHEMERAL 节点
notion image
实现流程如下:
  1. canal server 要启动某个 canal instance 时都先向 zookeeper 进行一次尝试启动判断 (实现:创建 EPHEMERAL 节点,谁创建成功就允许谁启动);
  1. 创建 zookeeper 节点成功后,对应的 canal server 就启动对应的 canal instance,没有创建成功的 canal instance 就会处于 standby 状态;
  1. 一旦 zookeeper 发现 canal server A 创建的节点消失后,立即通知其他的 canal server 再次进行步骤1的操作,重新选出一个 canal server 启动 instance;
  1. canal client 每次进行 connect 时,会首先向 zookeeper 询问当前是谁启动了canal instance,然后和其建立链接,一旦链接不可用,会重新尝试 connect。

2、Canal架构解析

架构解析基于 Canal 1.1.4 版本。这个版本首次引入了 canal-admin 组件,实现 canal 的界面化管理,生产强烈推荐该版本及以上的版本。

2.1 一次Binlog的解析过程

以 Mysql 数据库到 Kafka 的同步过程为例,从 Canal 启动到一次完整的 binlog 获取、解析、过滤、存储和发送过程如下所示
notion image
  1. canal 启动后通过 dump 协议连接上 Mysql 的主库,并从主库获取 binlog 的位点信息,然后把获取到的位点信息存到 zookeeper(通过配置文件设置),然后和主库保持长连接,等待 binlog 数据的推送。
  1. 主库发生数据变更后,把 binlog 数据推送到 canal。首先是 parse 模块开始解析数据,把二进制的 binlog 数据解析为 event 对象,然后发送到环形队列( EventTransactionBuffer ,类似于生产者的作用)。
  1. sink 模块从环形队列消费 event 对象,并进行过滤,把过滤后需要处理的 event 对象交给 store 模块。
  1. store 模块把 event 对象投放到目标存储,可以是消息队列(Kafka,RocketMQ),也可以是内存环形队列(基于 Disruptor 实现),然后使用内置的 client 客户端通过 ack 机制消费内存环形队列的对象。

2.2 Canal的整体结构

canal 包含的组件如下所示:
notion image
  • deployer模块:独立部署模块,用于 canal-server 的独立启动,包括本地配置解析、拉取远程配置、启动 canal-server。
  • server模块:canal-server 的实现逻辑,一个 canal-server 一般是一个 jvm 进程。重点关注两种 canal-server 的实现方式:内嵌型的 canalServerEmbed 和独立使用的 canalServerWithNetty。新版本中新增了直接对接 mq 的 canal-server 实现。
  • instance模块:具体实时订阅任务是由一个个 instance 组成的,每个 canal-server 中可以同时运行多个 instance。instance 由 parser、sink、store 三个重点模块组成。
  • parser模块:数据源接入,模拟 slave 协议和 master 进行交互,协议解析。parser 模块依赖于dbsyncdriver模块,使用 Disruptor 队列接收数据。
  • sink模块:将 parser 抓取到的数据,进行过滤,加工,然后发送到 store 模块进行存储。核心接口为 CanalEventSink
  • store模块:数据存储模块,类似内存模式到消息队列,本质上是一个 RingBuffer。核心接口为 CanalEventStore
  • meta模块:增量订阅&消费信息管理器,核心接口为 CanalMetaManager,主要用于记录 canal 消费到的 mysql binlog 的位置
  • client模块:项目最早的消费客户端,通过将 client 模块引入自己的项目中,然后直接消费 canal-server 获取的数据。
  • client-adapter模块:1.1.4 后新出的模块,可以独立部署为 canal-server 的消费服务端,是一个 springboot 项目。通过 SPI 机制,能够加载不同 plugins,将消费信息投递到 ES\hbase\rdb 等下游。
  • client-admin模块:1.1.4 新出的模块,可以独立部署为 canal-server 的控制台,配置 canal-server、instance 相关配置,非常好用。
  • prometheus模块:可以对接 prometheus 监控系统。
组件之间的关系如下:
notion image

2.3 deployer模块

deployer 模块主要用于独立部署 canal server。canal server 有两种使用方式:
  1. 独立部署
  1. 内嵌到应用中。
在主目录下执行mvn clean install -Dmaven.test.skip -Denv=release,在 target 目录下生成一个 canal.deployer-1.1.4.tar.gz 文件。这是一个可以独立部署的压缩包。 canal.deployer-1.1.4.tar.gz 解压后的结构如下:
deployer 模块完成的功能:
  1. 读取 canal、properties 配置文件。
  1. 启动 canal server,监听 canal client 的请求。
  1. 启动 canal instance,连接 mysql 数据库,伪装成 slave,解析 binlog。
  1. 起一个监听线程,在 canal 的运行过程中,监听配置文件的变化。

2.4 server模块

server 模块的核心接口是 CanalServer,其有2个实现类 CanalServerWithNettyCanalServerWithEmbeded
  • CanalServerWithEmbedded:应用内嵌,对 latency 和可用性要求都比较高,要求自己能 hold 住分布式相关技术。说白了我们可以不必独立部署 canal server,在应用直接使用 CanalServerWithEmbedded 直连mysql数据库。
  • CanalServerWithNetty:基于 netty 封装一层网络协议,由 canal-server 保证其可用性,采用 pull 模型。如果觉得自己的技术 hold 不住相关代码,就独立部署一个 canal server,使用 canal 提供的客户端,连接 canal server 获取 binlog 解析后数据。而 CanalServerWithNetty 是在 CanalServerWithEmbedded 的基础上做的一层封装,用于与客户端通信。Canal客户端发送的所有请求都交给 CanalServerWithNetty 处理解析,解析完成之后委派给了交给 CanalServerWithEmbedded 进行处理。因此 CanalServerWithNetty 就是一个马甲而已,CanalServerWithEmbedded 才是核心
notion image
server 模块完成的功能:
  1. 接受消费请求,包括内部消费(基于CanalServerWithEmbeded类),或者客户端消费(基于CanalServerWithNetty,但实际上会将其委派给CanalServerWithEmbeded处理。)
  1. 和客户端保持握手、心跳等操作。
  1. 运行 MQ Producer,实现内部消费,并发送 MQ 组件。

2.5 instance模块

上面我们提到 CanalServerWithNetty 封装了一层网络请求协议,将请求委派给 CanalServerWithEmbedded 处理。CanalServerWithEmbedded 会根据请求携带的 destination 参数,选择对应的 CanalInstance 来真正的处理请求,然后来到我们的核心处理模块——instance 模块
instance 模块下有三个子模块:
  • core模块:定义了CanalInstance接口,以及其抽象类子类AbstractCanalInstance
  • spring模块:提供了基于 spring 配置方式的CanalInstanceWithSpring实现,即 CanalInstance 实例的创建,通过spring配置文件来创建。
  • manager模块:提供了基于 manager 配置方式的CanalInstanceWithManager实现,即 CanalInstance 实例根据远程配置中心的内容来创建。
notion image
instance 模块主要有四个组件:
  • event parser:数据源接入,模拟 slave 协议和 master 进行交互,协议解析
  • event sink:parser 和 store 链接器,进行数据过滤,加工,分发的工作
  • event store:数据存储
  • meta manager:增量订阅/消费 binlog 元数据位置存储
notion image
前面三个模块下面会详细分析,这里主要讲一下 meta 模块。meta 模块负责记录 binlog 消费的位点消息,meta 模块的核心接口是 CanalMetaManager,主要有下面这些实现:
  • ZooKeeperMetaManager:将元数据存存储到 zk 中
  • MemoryMetaManager:将元数据存储到内存中
  • MixedMetaManager:组合 memory + zookeeper 的使用模式
  • PeriodMixedMetaManager:基于定时刷新的策略的 mixed 实现
  • FileMixedMetaManager:先写内存,然后定时刷新数据到 File
instance 模块完成的功能:
  1. core 模块把各个模块组装在一起运行,仅此而已。
  1. 如果基于本地配置,使用 spring 模块读取本地 instance 配置文件。
  1. 如果基于远程配置,使用 manager 模块读取远程 canal-admin 的配置信息。

2.6 parse模块

parse 模块是 canal 最核心的模块,核心功能是完成对 binlog 数据的抓取。
parse模块底层依赖 driver 和 dbsync 两个模块:
  • driver模块:阿里自身实现的数据驱动模块,作用类似于mysql-connector-java,只是实现了简单的dump、查询和更新等协议。
  • dbsync模块:也是网络通信等基础对象,应该是作者从其他地方复制粘贴过来的包,个人感觉可以放到 driver 模块里面。
parse 接收 binlog 数据的整个过程:
notion image
  1. 网络接收 (单线程),publish 投递到 RingBuffer
  1. 从 RingBuffe 获取事件,使用 SimpleParserStage 进行基本解析 (单线程,事件类型、DDL解析构造TableMeta、维护位点信息)
  1. 事件深度解析 ,用 workpool 进行多线程, 使用 DmlParserStage 进行DML事件数据的完整解析
  1. SinkStoreStage 单线程投递到 store
单线程接受事件后,为什么需要一个单线程先解析一下再多线程深度解析,而不是直接多线程深度解析?我也不懂。
parse 模块完成的功能:
  1. 管理位点记录,主要实现类是 CanalLogPositionManager
  1. 实现连接源数据库的心跳检测,并实现数据库的 HA,实现类是 CanalHAController
  1. 连接数据库,接收数据库的 binlog 信息,会把所有数据并放入 Disruptor 队列中,并启动 sink 模块进行过滤。

2.7 sink模块

sink 主要对 parse 模块传过来的数据进行过加工过滤,但 sink 模块本身不包含过滤的相关代码,而是向下依赖 filter 模块,把过滤功能的实现转移到 filter 模块,作者的初衷应该是想把过滤功能做成一个通用的模块。
filter 模块为了快速地实现过滤,依赖了 aviator 组件。aviator是一个高性能、轻量级的 java 语言实现的表达式求值引擎,直接将表达式编译成 Java 字节码,交给 JVM 去执行。所以 filter 的过滤执行速度非常快。
sink 模块完成的功能:
  1. 把分库后的多个数据库的实例的数据聚集到一起。
  1. 实现过滤功能,包括库名、表名、字段名等条件过滤。

2.8 store模块

store 模块主要把 parse 模块传过来的数据进行存储,这里的存储的意思是说把处理好的可以直接消费数据放到一个内存队列里面。如果不能及时调用客户端消费或者内部消费发送到 MQ,store 的内存队列会被很快打满,然后会阻塞前面的全部流程。
store 模块的核心类是 MemoryEventStoreWithBuffer,其实现借鉴了 Disruptor 的 RingBuffer。简而言之,你可以把其当做一个环形队列。
notion image
针对这个环形队列,canal 定义了 3 类操作:PutGetAck,其中:
  • Put添加数据。event parser 模块拉取到 binlog 后,并经过 event sink 模块过滤,最终就通过 Put 操作存储到了队列中。
  • Get获取数据。canal client 连接到 canal server 后,最终获取到的 binlog 都是从这个队列中取得。
  • Ack确认消费成功。canal client 获取到 binlog 事件消费后,需要进行 Ack。你可以认为 Ack 操作实际上就是将消费成功的事件从队列中删除,如果一直不 Ack 的话,队列满了之后,Put操作就无法添加新的数据了。
对应的,我们需要使用 3 个变量来记录 Put、Get、Ack 这三个操作的位置,其中:
  • putSequence: 每放入一个数据 putSequence +1,可表示存储数据存储的总数量
  • getSequence: 每获取一个数据 getSequence +1,可表示数据订阅获取的最后一次提取位置
  • ackSequence: 每确认一个数据 ackSequence + 1,可表示数据最后一次消费成功位置
如果将 RingBuffer 拉直来看,将会变得更加直观:
notion image
putSequence、getSequence、ackSequence这3个变量要求满足的关系是:
计算当前可消费的event数量:
计算当前队列的大小(即队列中还有多少事件等待消费):
store 模块实现的功能:
  1. 最主要是实现了 MemoryEventStoreWithBuffer 环形队列,并向外提供 put,get 和 ack 三种接口:
      • put:把解析好的可供消费的数据放到队列。
      • get:client 或内部消费获取队列数据。
      • ack:client 或内部消费完之后,通过回调方式回复,删除已经消费过的数据。

3、canal解析消息全过程

未完待续
Hexo+GitHub搭建个人博客教程Mysql集群篇:Binlog解析