type
status
date
slug
summary
tags
category
icon
password
AI summary
项目介绍
之前写过一篇文章整体介绍了一下canal的核心组件
文中是这么描述canal-client-adapter的
canal-client-adapter,也属于canal-client,可以把它理解为官方提供的一些常见的异构数据同步通道的实现,比如同步到elasticsearch、hbase、mongodb、mysql、redis、kafka等。所以和canal-client一样,它支持TCP和MQ的消费模式。 当然如果预设的满足不了,那就还是需要自定义了,可以选择基于canal-client-adapter的框架自定义扩展,也可以自己消费对应的MQ或者store去实现。
canal-client-adapter是canal官方提供的整个数据异构工程的最后一个环节。通过抽象整个同步流程,屏蔽不同异构数据源的api,构建了一套异构数据同步的框架。
项目结构
全篇文章基于canal-1.1.5版本分析
我们先来看看这个项目的模块分层

上面这些模块,主要分成三类:
- launcher——启动模块,里面主要是启动类加一些核心配置
- common——公共模块
- 其他——都是具体的adapter实现模块,比如es7x,就是具体用es7x的客户端同步到es里
启动流程分析
启动类还是最直观,最容易跟踪的东西。那我们就从launcher模块入手

这里面和启动流程相关的类有:
- BootstrapConfiguration
- CanalAdapterService
- ApplicationConfigMonitor
- CanalAdapterApplication
下面我们按照从易到难的顺序来分析:CanalAdapterApplication > BootstrapConfiguration > ApplicationConfigMonitor > CanalAdapterApplication
CanalAdapterApplication
只是个单纯的SpringBoot启动类,我们可以通过启动这个类在本地调试CanalClientAdapter
BootstrapConfiguration
canal-adapter支持两种配置模式:
- 从mysql读取配置
- 从配置文件读取配置
而这个类主要就是用来从数据库加载远程配置文件的:
可以看到,上面远程加载配置文件的流程总共分成三个核心动作(line 12-14):
loadRemoteConfig
——从mysql下载application.yml
主配置文件到本地磁盘
loadRemoteAdapterConfigs
——从mysql下载adapter相关配置文件到本地磁盘
startMonitor
——启动监听,定时触发loadRemoteConfig
和loadRemoteAdapterConfigs
loadRemoteConfig
和loadRemoteAdapterConfigs
的具体代码逻辑也比较简单。loadRemoteConfig
通过下列sql去表canal_config找一条id为2的特定记录,这条记录的content就是canal-adapter的主配置文件
application.yml
loadRemoteAdapterConfigs
adapter的相关配置是记录在canal_adapter_config表里的,这张表每一条记录就代表一个adapter配置文件
因为记录条数可能会比较多,所以这里的查询略有优化。首先会通过一个简单查询(不包括content配置文件字段),然后根据结果和本地的配置文件做modified_time的比对,如果发现有变更再会去mysql拉取有变更的记录的content
上面从远程下载的两种配置文件非常的重要,这里分别介绍一下:
application.yml
主配置文件主要用来配置:
- 消费模式(直连canal还是通过消息队列)
- 订阅关系(topic、消费组、消费方式(或者叫异构模式))
- 数据源定义(主要是消费消息时用来做一些数据查询转换以及etl时的数据源)
adapter.yml
adapter相关的配置文件。主要用来定义数据的清洗逻辑、转换逻辑
还有一点需要注意,该类是通过下面这个配置文件注入的
这里用了比较取巧的方式,为了让这个类更早的被初始化,然后去加载配置。
并且,还要把应用里的
application.yml
等配置文件都删掉才能生效,不然会直接追加到本地配置文件application.yml
下图是从官方repo里介绍“从远程加载配置”的用法:

ApplicationConfigMonitor
这个类如它的名字一样,主要是用来监听
application.yml
配置文件变更的是不是感觉这个和上面的
RemoteConfigLoader.startMonitor()
有一点相似和重复?
其实并没有,它俩承担了不同的职能:RemoteConfigLoader.startMonitor()
是负责把mysql里的配置下载到本地磁盘上
ApplicationConfigMonitor
是负责监听本地磁盘上文件的变化,进而触发应用内的相关对象的变化
另外,这个类只监听了
application.yml
,adapter的配置文件监听会在后面看到CanalAdapterService
这可能是本篇最复杂的一个启动类讲解了。以下的代码片段都做过一些简化处理,只保留了和流程相关的部分,方便大家理解。
由于我们这里主要关注的是启动流程,会选择性忽略一些类加载、SPI相关代码的分析。
上述代码间写了一些注释,可以辅助理解,这里再做一些说明:
- 首先,因为核心逻辑主要围绕application.yml这个配置文件,所以你需要对这个配置文件的结构有一定的了解,主要了解canalAdapters这个配置项 参考链接
- 看完配置项之后我再大概解释一下,其实canalAdapters有点像配置MQ的消费组一样,对于每个canalAdapter,你需要配置
- 1个需要监听的topic
- 1-n个消费组标识groupId
- 每个消费组内可以配置1-n个串行消费动作outerAdapters
- 同一个topic可以配置多个消费组,每个消费组可以配置多个消费动作
- 每一个消费组会生成一个AdapterProcessor,会调用每个AdapterProcessor的start方法
AdapterProcessor
下面分析核心类AdapterProcessor,先看构造函数
既然看到CanalMsgConsumer了,我们简单看下它的接口定义,会更容易理解:
看着这几个接口定义,应该大致都能猜到消费的流程了,不急,我们还是耐着性子看代码:
AdapterProcessor的start方法,创建并启动了一个线程,核心逻辑
process
:线程里的逻辑很简单:
- 向mq订阅对应的topic
- 去mq里拉取消息
- 处理消息
- 向mq ack消息
- 重复第2步
对应CanalMsgConsumer的具体实现我们就不看了,这里重点再看下处理消息的逻辑(writeOut):
如果你仔细看到了这里,我想你应该能发现,上面这段代码其实不用这么复杂,因为canalOuterAdapters的长度永远都是1,不存在并行的场景,直接串行执行组内适配器即可:
再看看batchSync的代码:
支持了分批同步,上面的分支逻辑还算清晰:
- 如果总消息数都没达到1个批次的数量,那么直接同步
- 如果总消息数超过了1个批次的数量,那么就1批批的做同步
- 如果最终还剩余不到1批次的数量,那么再同步一次
好了,核心的消息逻辑就分析完了,adapter.sync跟具体的实现挂钩了

设计到具体的实现逻辑,这块我们后面再单独写。
总结
本篇文章重点分析了canal-adapter的启动流程。并且推荐以远程配置的方式运行canal-adapter。在这里再总结一下整个启动流程:
- 连接mysql加载主配置文件和adapter相关配置文件
- 启动定时器轮询配置文件,发现有变化则下载到本地
- 启动定时器监听本地配置文件,发现有变化则触发应用读取文件并reload相应组件
- 针对于每一个topic / instance,可配置多个消费组,每个消费组可以配置一组消费单元,组内消费单元串行执行
- 每个消费组只有一个线程去拉取消息,但是可以通过多实例来控制线程拉取的topic的队列数来达到水平扩展的效果
- Author:黑微狗
- URL:https://blog.hwgzhu.com/article/canal-client-adapter-core-code-bootstrap
- Copyright:All articles in this blog, except for special statements, adopt BY-NC-SA agreement. Please indicate the source!
Relate Posts