Canal安装部署与测试

这篇具有很好参考价值的文章主要介绍了Canal安装部署与测试。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

第一章 Canal概述

Github地址:https://github.com/alibaba/canal

1.1 简介

Canal安装部署与测试,flink,实时数仓,Canal,canal安装部署,canal整合kafka,canal实时数据采集,实时数据采集

canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

1.2 工作原理

1.2.1 MySQL主备复制原理

Canal安装部署与测试,flink,实时数仓,Canal,canal安装部署,canal整合kafka,canal实时数据采集,实时数据采集

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

1.2.2 canal 工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流),再回放中继日志

1.3 重要版本更新说明

  1. canal 1.1.x 版本(release_note),性能与功能层面有较大的突破,重要提升包括:
    • 整体性能测试&优化,提升了150%. #726 参考: Performance
    • 原生支持prometheus监控 #765 Prometheus QuickStart
    • 原生支持kafka消息投递 #695 Canal Kafka/RocketMQ QuickStart
    • 原生支持aliyun rds的binlog订阅 (解决自动主备切换/oss binlog离线解析) 参考: Aliyun RDS QuickStart
    • 原生支持docker镜像 #801 参考: Docker QuickStart
  2. canal 1.1.4版本,迎来最重要的WebUI能力,引入canal-admin工程,支持面向WebUI的canal动态管理能力,支持配置、任务、日志等在线白屏运维能力,具体文档:Canal Admin Guide

1.4 多语言

canal 特别设计了 client-server 模式,交互协议使用 protobuf 3.0 , client 端可采用不同语言实现不同的消费逻辑,欢迎大家提交 pull request

  • canal java 客户端: https://github.com/alibaba/canal/wiki/ClientExample
  • canal c# 客户端: https://github.com/dotnetcore/CanalSharp
  • canal go客户端: https://github.com/CanalClient/canal-go
  • canal Python客户端: https://github.com/haozi3156666/canal-python

canal 作为 MySQL binlog 增量获取和解析工具,可将变更记录投递到 MQ 系统中,比如 Kafka/RocketMQ,可以借助于 MQ 的多语言能力

  • 参考文档: Canal Kafka/RocketMQ QuickStart

第二章 Canal安装部署

2.1 准备

  • 查看mysql是否开启bin-log,查询语句如下:

    SHOW VARIABLES LIKE '%log_bin%'
    

    如查询出来log_bin的Value是OFF状态,则需要执行如下的开启bin-log步骤。

  • 对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下

    [mysqld]
    log-bin=mysql-bin # 开启 binlog
    binlog-format=ROW # 选择 ROW 模式
    server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
    
    #注意,配置后需要重启msyql
    [root@hadoop01 ~]# systemctl restart mysqld
    
    • 注意:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步
  • 授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant(如有授权用户可不执行如下命令)。

    CREATE USER canal IDENTIFIED BY 'canal';  
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
    FLUSH PRIVILEGES;
    

2.2 canal安装

https://github.com/alibaba/canal/releases 中查找 canal 最新deploy 版本的下载链接
1.下载压缩包:

[root@hadoop01 ~]# wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz

2.解压

[root@hadoop01 ~]# mkdir /usr/local/canal
[root@hadoop01 ~]# tar -zxvf /home/canal.deployer-1.1.4.tar.gz -C /usr/local/canal

3.创建测试配置直接使用自带的样例进行更改,并复制出默认配置以备后期增加库时使用,这里的每一个文件夹就代表一个数据源,多个数据源就复制出多个文件夹后在 canal.properties 的 canal.destinations 里逗号分隔进行配置:

[root@hadoop01 canal]# cp -r ./conf/example/ ./conf/test

4.修改单个数据源的配置

[root@hadoop01 canal]# vi ./conf/test/instance.properties
#修改如下
canal.instance.master.address=127.0.0.1:3306 	# 数据库服务器地址
canal.instance.dbUsername=root 	# 数据库用户名
canal.instance.dbPassword=root 	# 数据库密码
#canal.instance.filter.regex=.*\\..*  # 数据表过滤正则,dbName.tbName,默认的是所有库的所有表
canal.instance.filter.regex=travel.dim_product1 	# 数据表过滤正则,dbName.tbName,默认的是所有库的所有表
canal.mq.topic=travel_dim_product1 		# 这个数据库存储进 Kafka 时使用的 topic

5.修改 Canal 全局设置

[root@hadoop01 canal]# vi conf/canal.properties

#修改如下
canal.destinations = test 	# 这里配置开启的 instance,具体方法上面步骤有说明

canal.serverMode = kafka 	# 更改模式,直接把数据扔进 Kafka
#canal.mq.servers = 127.0.0.1:6667
canal.mq.servers = 192.168.216.111:9092,192.168.216.112:9092,192.168.216.113:9092 	# Kafka 的地址
canal.mq.batchSize = 16384 # 这里没有更改,值应该小于 Kafka 的 config/producer.properties 中 batch.size,但是 Kafka 里没设置,这里也就不更改了
#canal.mq.flatMessage = false
canal.mq.flatMessage = true 	# 使用文本格式(JSON)进行传输,否则 Kafka 里扔进去的是二进制数据,虽然不影响,但是看起来不方便
配置文件详解:
参数名字 参数说明 默认值
canal.destinations 当前server上部署的instance列表
canal.id 每个canal server实例的唯一标识,暂无实际意义 1
canal.ip canal server绑定的本地IP信息,如果不配置,默认选择一个本机IP进行启动服务 无
canal.port canal server提供socket服务的端口 11111
canal.zkServers canal server链接zookeeper集群的链接信息
例子:127.0.0.1:2181,127.0.0.1:2182 无

canal.zookeeper.flush.period canal持久化数据到zookeeper上的更新频率,单位毫秒 1000
canal.file.data.dir canal持久化数据到file上的目录 …/conf (默认和instance.properties为同一目录,方便运维和备份)
canal.file.flush.period canal持久化数据到file上的更新频率,单位毫秒 1000
canal.instance.mysql.slaveId mysql集群配置中的serverId概念,需要保证和当前mysql集群中id唯一 1234
canal.instance.master.address mysql主库链接地址 127.0.0.1:3306
canal.instance.master.journal.name mysql主库链接时起始的binlog文件 无
canal.instance.master.position mysql主库链接时起始的binlog偏移量 无
canal.instance.master.timestamp mysql主库链接时起始的binlog的时间戳 无
canal.instance.dbUsername mysql数据库帐号 canal
canal.instance.dbPassword mysql数据库密码 canal
canal.instance.defaultDatabaseName mysql链接时默认schema 
canal.instance.connectionCharset mysql 数据解析编码 UTF-8
canal.instance.filter.regex mysql 数据解析关注的表,Perl正则表达式.
多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\) 
常见例子:
1.  所有表:.*   or  .*\\..*
2.  canal schema下所有表: canal\\..*
3.  canal下的以canal打头的表:canal\\.canal.*
4.  canal schema下的一张表:canal.test1
5.  多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)
注意:此过滤条件只针对row模式的数据有效(ps. mixed/statement因为不解析sql,所以无法准确提取tableName进行过滤)	.*\\..*

6.开启 Canal
bin/startup.sh

#因为 Canal 的脚本名称都太普通,所以没有添加到 PATH 里

[root@hadoop01 canal]# ./bin/startup.sh

7.查看日志是否有异常:

[root@hadoop01 canal]# tail -f ./logs/canal/canal.log   #server启动日志
[root@hadoop01 canal]# tail -f ./logs/test/test.log   #instance 的日志

第三章 Canal和Kafka整合测试

kafka的安装在这儿省略,大家自行安装即可…

  1. 测试,连上数据库尝试执行更改 SQL
CREATE TABLE `yq` (
  `dt` varchar(25) NOT NULL,
  `province` varchar(255) NOT NULL,
  `adds` int(11) DEFAULT '0',
  `possibles` int(11) DEFAULT '0',
  PRIMARY KEY (`dt`,`province`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
**查看 Kafka 中的数据,列出所有 topic:
kafka-topics.sh --list --zookeeper localhost:2181消费其中的数据:kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning此时应该已经可以列出创建表时的语句后期 Bug 修复:消费端 Kafka 不进数据,Canal 日志报错 org.apache.kafka.common.errors.RecordTooLargeException,认为是 Kafka 消息体大小限制造成的,需要同时修改 Kafka 与 Canal 消息体的最大限制修改 Kafka 配置,server.properties 中修改或添加配置项 message.max.bytes=100000000,producer.properties 中修改或添加配置项 max.request.size=100000000,consumer.properties 中修改或添加配置项 max.partition.fetch.bytes=100000000,重启 Kafka修改 Canal 配置,canal.properties 修改 canal.mq.maxRequestSize 参数值为 90000000,重启 Canal查看 Canal 日志是否报错 Could not find first log file name in binary log index file at… 如果报错则停止 Canal ,再删除实例配置下的 meta.dat 文件,再启动 Canal 即可**
  1. 对yq表进行update、insert、delete操作
{"data":[{"dt":"2020-7-1","province":"beijing","adds":"3","possibles":"12"}],"database":"test","es":1603647506000,"id":1,"isDdl":false,"mysqlType":{"dt":"varchar(25)","province":"varchar(255)","adds":"int(11)","possibles":"int(11)"},"old":[{"possibles":"6"}],"pkNames":["dt","province"],"sql":"","sqlType":{"dt":12,"province":12,"adds":4,"possibles":4},"table":"yq","ts":1603647506282,"type":"UPDATE"}
{"data":[{"dt":"2020-7-2","province":"beijing","adds":"10","possibles":"6"}],"database":"test","es":1603647607000,"id":2,"isDdl":false,"mysqlType":{"dt":"varchar(25)","province":"varchar(255)","adds":"int(11)","possibles":"int(11)"},"old":[{"possibles":"5"}],"pkNames":["dt","province"],"sql":"","sqlType":{"dt":12,"province":12,"adds":4,"possibles":4},"table":"yq","ts":1603647607348,"type":"UPDATE"}
{"data":[{"dt":"2020-10-26","province":"qingdao","adds":"2","possibles":"3"}],"database":"test","es":1603647656000,"id":3,"isDdl":false,"mysqlType":{"dt":"varchar(25)","province":"varchar(255)","adds":"int(11)","possibles":"int(11)"},"old":null,"pkNames":["dt","province"],"sql":"","sqlType":{"dt":12,"province":12,"adds":4,"possibles":4},"table":"yq","ts":1603647656168,"type":"INSERT"}
{"data":[{"dt":"2020-10-26","province":"qingdao","adds":"2","possibles":"3"}],"database":"test","es":1603647707000,"id":4,"isDdl":false,"mysqlType":{"dt":"varchar(25)","province":"varchar(255)","adds":"int(11)","possibles":"int(11)"},"old":null,"pkNames":["dt","province"],"sql":"","sqlType":{"dt":12,"province":12,"adds":4,"possibles":4},"table":"yq","ts":1603647707172,"type":"DELETE"}
  1. 关闭Canal
[root@hadoop01 canal]# ./bin/stop.sh

关闭之后,如果mysql的表数据有修改,当重启canal以后,将会把变更数据投递到kafka中!!!

注意事项

1、bin-log的存储三种Format:

mysql复制主要有三种方式:基于SQL语句的复制(statement-based replication, SBR),基于行的复制(row-based replication, RBR),混合模式复制(mixed-based replication, MBR)。对应的,binlog的格式也有三种:STATEMENT,ROW,MIXED。

① STATEMENT模式(SBR)

每一条会修改数据的sql语句会记录到binlog中。优点是并不需要记录每一条sql语句和每一行的数据变化,减少了binlog日志量,节约IO,提高性能。缺点是在某些情况下会导致master-slave中的数据不一致(如sleep()函数, last_insert_id(),以及user-defined functions(udf)等会出现问题)

② ROW模式(RBR)

不记录每条sql语句的上下文信息,仅需记录哪条数据被修改了,修改成什么样了。而且不会出现某些特定情况下的存储过程、或function、或trigger的调用和触发无法被正确复制的问题。缺点是会产生大量的日志,尤其是alter table的时候会让日志暴涨。

③ MIXED模式(MBR)

以上两种模式的混合使用,一般的复制使用STATEMENT模式保存binlog,对于STATEMENT模式无法复制的操作使用ROW模式保存binlog,MySQL会根据执行的SQL语句选择日志保存方式。

2、canal会丢失数据吗?

建议将增量数据投递到MQ中,如果canal有异常,当恢复时候会重读中继日志

3、canal的下游怎么处理?

存储可以进行实时读写处理中---流式计算 ---> 追加到流表中、选择支持upsert 模式流表中、或者选择自己维护状态
离线,可以使用canal周期落地 --- 离线

4、canal对接kafka,kafka多分区怎么办

1、单分区能保证增量数据顺序---但是丢失并行度
2、按照维度得主键进行分区  --- 增加并行处理能力

kafka是3个分区
k-0
k-1
k-2

mysql
111
112
113

111 zs k-0
111 ls k-1
113 k-2

flink消费:
3个并行度消费
k-1 先消费
k-0 消费

5、mysql多少节点、canal多少节点

如果维度表数量在50个一下,则构建3-5个canal集群即可。
50-100个维度表,则构建7-11个左右canal即可

6、canal保留原始数据和增量数据?文章来源地址https://www.toymoban.com/news/detail-606817.html

ES或者HBASE:
eid
主键_version 其它字段 是否有效 起始时间 生效时间

redis:
主键 [{},{},{}] 索引+1就是版本信息

到了这里,关于Canal安装部署与测试的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 使用 Docker 部署 canal 服务实现MySQL和ES实时同步

    参考 ClientAdapter: Canal的Adapter配置项目 Sync ES:Canal的Adapter中ES同步的配置项 使用 Docker 部署 canal 服务 docker canal-server canal-adapter mysql Canal(基于Docker同步mysql数据到elasticsearch) Canal部署过程中的错误 Canal 1.1.4 Canal Adapter 1.1.4 Kibana: 6.8.8 ElasticSearch: 6.4.3 由于Canal 1.1.4只能适配 Ela

    2024年02月13日
    浏览(45)
  • Spring Boot整合canal实现数据一致性解决方案解析-部署+实战

    🏷️ 个人主页 :牵着猫散步的鼠鼠  🏷️ 系列专栏 :Java全栈-专栏 🏷️ 个人学习笔记,若有缺误,欢迎评论区指正   1.前言 2.canal部署安装 3.Spring Boot整合canal 3.1数据库与缓存一致性问题概述 3.2 整合canel 4.总结 canal [kə\\\'næl]  ,译意为水道/管道/沟渠,主要用途是 基于

    2024年03月19日
    浏览(54)
  • Canal实时同步MySQL数据到ES

    canal主要用途是对MySQL数据库增量日志进行解析,提供增量数据的订阅和消费,简单说就是可以对MySQL的增量数据进行实时同步,支持同步到MySQL、Elasticsearch、HBase等数据存储中去。 早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业

    2024年02月04日
    浏览(61)
  • SpringBoot整合Canal+RabbitMQ监听数据变更

    需求 步骤 环境搭建 整合SpringBoot Canal实现客户端 Canal整合RabbitMQ SpringBoot整合RabbitMQ   我想要在SpringBoot中采用一种与业务代码解耦合的方式,来实现数据的变更记录,记录的内容是新数据,如果是更新操作还得有旧数据内容。 经过调研发现,使用Canal来监听MySQL的binlog变化可

    2024年02月11日
    浏览(46)
  • canal实现mysql数据实时同步到es

    最近有一个需求:原有一些mysql数据,这些数据量很大,且包含文本信息,需要对其进行搜索,这时如果使用mysql的like来匹配,效率会很低,且很可能影响整个系统的运行,经过和同事的讨论,最终决定使用es来做搜索。 但是源数据有很多关联关系,搜索的时候也会带上这些

    2024年02月16日
    浏览(64)
  • flink cdc 、 canal 、maxwell 的区别

    目录 读取数据的格式不同   (CDC是自定义的数据类型  在这里就不进行展示了,主要是展示一下Maxwell和Canal的区别) 1.添加的区别         1.1 Canal 1.2 Maxwell 2.修改的区别 2.1Canal 2,2Maxwell 3.删除的区别 3.1 Canal 3.2 Maxwell Flink CDC :         DataStream:                 优点:多库多

    2023年04月17日
    浏览(41)
  • SpringBoot整合Canal实现数据同步到ElasticSearch

    canal 译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费,canal可以用来监控数据库数据的变化,从而获得新增数据,或者修改的数据。 canal原理就是伪装成mysql的从节点,从而订阅master节点的binlog日志 Canal原理: canal模拟mysql slave的交

    2024年02月06日
    浏览(46)
  • SpringCloud 整合 Canal+RabbitMQ+Redis 实现数据监听

    Canal 指的是阿里巴巴开源的数据同步工具,用于数据库的实时增量数据订阅和消费。它可以针对 MySQL、MariaDB、Percona、阿里云RDS、Gtid模式下的异构数据同步等情况进行实时增量数据同步。 当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x Canal是如何同步数据库

    2024年02月03日
    浏览(52)
  • SpringBoot快速整合canal1.1.5(TCP模式)

    SpringBoot快速整合canal1.1.5(TCP模式) 安装并配置MySQL主从⭐ 1:Docker安装MySQL8.0.28 2:创建目录: 3:编写my.cnf文件: 内容如下:(注意:把binlog-do-db的值修改成你需要canal监听的数据库名称,如果需要监听多个数据库,一定要在下面写多个binlog-do-db,而不是用“,”分隔) 4:启

    2024年02月06日
    浏览(39)
  • Canal —— 一款 MySql 实时同步到 ES 的阿里开源神器

    目录 一. 前言 二. Canal 简介和使用场景 2.1. Canal 简介 2.2. Canal 使用场景 三. Canal Server 设计 3.1. 整体设计 3.2. EventParser 设计 3.3. CanalLogPositionManager 设计 3.4. CanalHAController 类图设计 3.5. EventSink 类图设计和扩展 3.6. EventStore 类图设计和扩展 3.7. MetaManager 类图设计和扩展 四. Can

    2024年01月25日
    浏览(57)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包