type
status
date
slug
summary
tags
category
icon
password
AI summary

项目介绍

之前写过一篇文章整体介绍了一下canal的核心组件
文中是这么描述canal-client-adapter的
canal-client-adapter,也属于canal-client,可以把它理解为官方提供的一些常见的异构数据同步通道的实现,比如同步到elasticsearch、hbase、mongodb、mysql、redis、kafka等。所以和canal-client一样,它支持TCP和MQ的消费模式。 当然如果预设的满足不了,那就还是需要自定义了,可以选择基于canal-client-adapter的框架自定义扩展,也可以自己消费对应的MQ或者store去实现。
canal-client-adapter是canal官方提供的整个数据异构工程的最后一个环节。通过抽象整个同步流程,屏蔽不同异构数据源的api,构建了一套异构数据同步的框架。

项目结构

全篇文章基于canal-1.1.5版本分析
我们先来看看这个项目的模块分层
notion image
上面这些模块,主要分成三类:
  1. launcher——启动模块,里面主要是启动类加一些核心配置
  1. common——公共模块
  1. 其他——都是具体的adapter实现模块,比如es7x,就是具体用es7x的客户端同步到es里

启动流程分析

启动类还是最直观,最容易跟踪的东西。那我们就从launcher模块入手
notion image
这里面和启动流程相关的类有:
  1. BootstrapConfiguration
  1. CanalAdapterService
  1. ApplicationConfigMonitor
  1. CanalAdapterApplication
下面我们按照从易到难的顺序来分析:CanalAdapterApplication > BootstrapConfiguration > ApplicationConfigMonitor > CanalAdapterApplication

CanalAdapterApplication

只是个单纯的SpringBoot启动类,我们可以通过启动这个类在本地调试CanalClientAdapter

BootstrapConfiguration

canal-adapter支持两种配置模式:
  1. 从mysql读取配置
  1. 从配置文件读取配置
而这个类主要就是用来从数据库加载远程配置文件的:
可以看到,上面远程加载配置文件的流程总共分成三个核心动作(line 12-14):
  1. loadRemoteConfig——从mysql下载application.yml 主配置文件到本地磁盘
  1. loadRemoteAdapterConfigs——从mysql下载adapter相关配置文件到本地磁盘
  1. startMonitor——启动监听,定时触发loadRemoteConfigloadRemoteAdapterConfigs
loadRemoteConfigloadRemoteAdapterConfigs的具体代码逻辑也比较简单。

loadRemoteConfig

通过下列sql去表canal_config找一条id为2的特定记录,这条记录的content就是canal-adapter的主配置文件application.yml

loadRemoteAdapterConfigs

adapter的相关配置是记录在canal_adapter_config表里的,这张表每一条记录就代表一个adapter配置文件
因为记录条数可能会比较多,所以这里的查询略有优化。首先会通过一个简单查询(不包括content配置文件字段),然后根据结果和本地的配置文件做modified_time的比对,如果发现有变更再会去mysql拉取有变更的记录的content
上面从远程下载的两种配置文件非常的重要,这里分别介绍一下:

application.yml

主配置文件主要用来配置:
  1. 消费模式(直连canal还是通过消息队列)
  1. 订阅关系(topic、消费组、消费方式(或者叫异构模式))
  1. 数据源定义(主要是消费消息时用来做一些数据查询转换以及etl时的数据源)

adapter.yml

adapter相关的配置文件。主要用来定义数据的清洗逻辑、转换逻辑
还有一点需要注意,该类是通过下面这个配置文件注入的
这里用了比较取巧的方式,为了让这个类更早的被初始化,然后去加载配置。 并且,还要把应用里的application.yml等配置文件都删掉才能生效,不然会直接追加到本地配置文件application.yml
下图是从官方repo里介绍“从远程加载配置”的用法:
notion image

ApplicationConfigMonitor

这个类如它的名字一样,主要是用来监听application.yml配置文件变更的
是不是感觉这个和上面的RemoteConfigLoader.startMonitor()有一点相似和重复? 其实并没有,它俩承担了不同的职能:
  1. RemoteConfigLoader.startMonitor()是负责把mysql里的配置下载到本地磁盘上
  1. ApplicationConfigMonitor是负责监听本地磁盘上文件的变化,进而触发应用内的相关对象的变化
另外,这个类只监听了application.yml,adapter的配置文件监听会在后面看到

CanalAdapterService

这可能是本篇最复杂的一个启动类讲解了。以下的代码片段都做过一些简化处理,只保留了和流程相关的部分,方便大家理解。
由于我们这里主要关注的是启动流程,会选择性忽略一些类加载、SPI相关代码的分析。 上述代码间写了一些注释,可以辅助理解,这里再做一些说明:
  1. 首先,因为核心逻辑主要围绕application.yml这个配置文件,所以你需要对这个配置文件的结构有一定的了解,主要了解canalAdapters这个配置项 参考链接
  1. 看完配置项之后我再大概解释一下,其实canalAdapters有点像配置MQ的消费组一样,对于每个canalAdapter,你需要配置
    1. 1个需要监听的topic
    2. 1-n个消费组标识groupId
    3. 每个消费组内可以配置1-n个串行消费动作outerAdapters
  1. 同一个topic可以配置多个消费组,每个消费组可以配置多个消费动作
  1. 每一个消费组会生成一个AdapterProcessor,会调用每个AdapterProcessor的start方法

AdapterProcessor

下面分析核心类AdapterProcessor,先看构造函数
既然看到CanalMsgConsumer了,我们简单看下它的接口定义,会更容易理解:
看着这几个接口定义,应该大致都能猜到消费的流程了,不急,我们还是耐着性子看代码:
AdapterProcessor的start方法,创建并启动了一个线程,核心逻辑process
线程里的逻辑很简单:
  1. 向mq订阅对应的topic
  1. 去mq里拉取消息
  1. 处理消息
  1. 向mq ack消息
  1. 重复第2步
对应CanalMsgConsumer的具体实现我们就不看了,这里重点再看下处理消息的逻辑(writeOut):
如果你仔细看到了这里,我想你应该能发现,上面这段代码其实不用这么复杂,因为canalOuterAdapters的长度永远都是1,不存在并行的场景,直接串行执行组内适配器即可:
再看看batchSync的代码:
支持了分批同步,上面的分支逻辑还算清晰:
  1. 如果总消息数都没达到1个批次的数量,那么直接同步
  1. 如果总消息数超过了1个批次的数量,那么就1批批的做同步
    1. 如果最终还剩余不到1批次的数量,那么再同步一次
好了,核心的消息逻辑就分析完了,adapter.sync跟具体的实现挂钩了
notion image
设计到具体的实现逻辑,这块我们后面再单独写。

总结

本篇文章重点分析了canal-adapter的启动流程。并且推荐以远程配置的方式运行canal-adapter。在这里再总结一下整个启动流程:
  1. 连接mysql加载主配置文件和adapter相关配置文件
  1. 启动定时器轮询配置文件,发现有变化则下载到本地
  1. 启动定时器监听本地配置文件,发现有变化则触发应用读取文件并reload相应组件
  1. 针对于每一个topic / instance,可配置多个消费组,每个消费组可以配置一组消费单元,组内消费单元串行执行
  1. 每个消费组只有一个线程去拉取消息,但是可以通过多实例来控制线程拉取的topic的队列数来达到水平扩展的效果
Relate Posts
使用多值索引(Multi-Value Index)导致Canal同步异常
Lazy loaded image
如何解析离线binlog文件
Lazy loaded image
线上问题分析——canal-adapter数据同步不全问题排查
Lazy loaded image
canal-adapter插件式架构解析
Lazy loaded image
刷一张亿级表带来的思考
Lazy loaded image
Canal解析binlog文件的设计缺陷
Lazy loaded image
canal-adapter插件式架构解析mysql流式查询中的一个坑
Loading...
黑微狗
黑微狗
一只普通的干饭汪🍚
Latest posts
RocketMQ 4.6.0 Message Trace 功能异常排查
2025-4-8
browser-use 项目核心原理
2025-3-28
关于怎么搭建一个这样的blog
2025-3-28
关于怎么给blog搞一个自定义的域名
2025-3-28
Excel导入需求升级——支持内嵌图片导入
2025-3-28
mysql流式查询中的一个坑
2025-3-28