Canal同步Mysql实时操作日志至RabbitMQ,并实现监听及解析处理

这篇具有很好参考价值的文章主要介绍了Canal同步Mysql实时操作日志至RabbitMQ,并实现监听及解析处理。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言

关于Canal的介绍及原理不在此赘述,可自行查阅。笔者在使用Canal同步Mysql实时操作记录至RabbitMQ的过程中,也翻阅了一些大牛们的文章,可能是我使用的Canal版本与文中版本不一致,出现了一些问题,在此总结记录一下可行的方案。
注:本文使用的Canal为 v1.1.7

一、Mysql数据库开启bin_log

  • 先查看目标数据库是否开启bin_log
SHOW VARIABLES LIKE 'log_bin'

如结果中,log_bin的值为OFF则未开启,为ON则已开启。
Canal同步Mysql实时操作日志至RabbitMQ,并实现监听及解析处理,mysql,rabbitmq,数据库

  • 如未开启,可编辑Mysql配置文件:/etc/my.cnf
[mysqld]
log-bin=mysql-bin # 开启binlog
binlog-format=ROW # 选择ROW模式
server_id=1 # 配置MySQL replaction需要定义,不和Canal的slaveId重复即可

重启MySQL ,再次通过上一步查看配置是否生效。

二、数据库创建新用户

  • 创建专用于数据同步的新用户
-- 创建一个新用户,名称可自行定义
create user canal@'%' IDENTIFIED by 'canal';
-- 为新用户授权
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
-- 刷新缓存中的用户数据
FLUSH PRIVILEGES;

三、配置RabhitMQ

以下使用的名称均可自行定义,保证唯一即可

1. 添加交换机

Canal同步Mysql实时操作日志至RabbitMQ,并实现监听及解析处理,mysql,rabbitmq,数据库

2. 添加队列

Canal同步Mysql实时操作日志至RabbitMQ,并实现监听及解析处理,mysql,rabbitmq,数据库

3. 绑定交换机与队列,设置 Routing key

Canal同步Mysql实时操作日志至RabbitMQ,并实现监听及解析处理,mysql,rabbitmq,数据库

四、下载、配置、运行Canal(windows环境)

1. 下载服务端

  • 可到以下地址下载所需版本的包:github-alibaba-canal
    本文使用较新的 v1.1.7
    Canal同步Mysql实时操作日志至RabbitMQ,并实现监听及解析处理,mysql,rabbitmq,数据库

  • 选择下载 canal.deployer-1.1.7.tar.gz
    Canal同步Mysql实时操作日志至RabbitMQ,并实现监听及解析处理,mysql,rabbitmq,数据库

2. 配置

  • 解压下载包,获得如下文件。
    Canal同步Mysql实时操作日志至RabbitMQ,并实现监听及解析处理,mysql,rabbitmq,数据库
  • 编辑:conf\canal.properties(仅列出需要修改的配置项)
# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
canal.serverMode = rabbitMQ
##################################################
######### 		    RabbitMQ	     #############
##################################################
# host 无需添加端口号
rabbitmq.host = 192.168.0.2
# 填写 / 即可
rabbitmq.virtual.host = /
# RabbitMQ的用户名、密码
rabbitmq.username = admin
rabbitmq.password = 123456
# 上文配置的交换机(exchange)名称:Name
rabbitmq.exchange = canal.exchange
# 交换机类型:Type
rabbitmq.deliveryMode = direct

# 以下两个字段为自行添加,否则会报空指针异常
# 队列(queue)名称:Name
rabbitmq.queue = canal.queue
# 绑定队列-交换机时的路由秘钥:Routing key
rabbitmq.routingKey = canal.routing.key
  • 编辑:conf\example\instance.properties(仅列出需要修改的配置项)
# 目标数据库地址
canal.instance.master.address=192.168.0.1:3306
# 目标数据库用户名密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=Canal@123

# 表过滤正则表达式(按需修改)
# 全库全表 : .*\\..*
# 指定库所有表:  库名\..*   例:test\..*
# 单表:  库名.表名  例:test.user
# 多规则组合使用:  库名1\..*,库名2.表名1,库名3.表名2 (逗号分隔)  例 test\..*,test2.user1,test3.user2 (逗号分隔)
canal.instance.filter.regex=.*\\..*
# canal.instance.filter.regex=project.sys_user,project.sys_role

3. 运行

windows环境下直接运行bin\startup.bat,linux环境下执行bin\startup.sh
执行启动脚本后,查看日志信息logs\canal\canal.log,出现如下信息,表示启动成功。

[main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......

五、测试

对监听的数据库表做修改操作,至RabbitMQ控制台的队列中查看是否插入消息。
如下,即成功插入实时操作数据。
Canal同步Mysql实时操作日志至RabbitMQ,并实现监听及解析处理,mysql,rabbitmq,数据库

六、项目中监听处理

  • 创建一个maven项目

Canal同步Mysql实时操作日志至RabbitMQ,并实现监听及解析处理,mysql,rabbitmq,数据库

  • pom.xml中引入spring-boot-starter-amqp依赖,此包集成了对RabbitMQ的支持。
	<!-- RabbitMQ 集成支持 -->
	<dependency>
	    <groupId>org.springframework.boot</groupId>
	    <artifactId>spring-boot-starter-amqp</artifactId>
	</dependency>
	
	<!-- fastjson 解析数据 -->
	<dependency>
	    <groupId>com.alibaba</groupId>
	    <artifactId>fastjson</artifactId>
	    <version>2.0.9.graal</version>
	</dependency>
  • 修改配置文件application.yml(此处已按个人偏好,文件类型改为yaml),配置RabbitMQ。
spring:
  rabbitmq:
    host: 192.168.0.2
    port: 5672
    username: admin
    password: 123456
  • binLog数据实体类BinLogEntity
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import lombok.Data;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Data
public class BinLogEntity {
    /**
     * 数据库
     */
    private String database;

    /**
     * 表
     */
    private String table;

    /**
     * 操作类型
     */
    private String type;

    /**
     * 操作数据
     */
    private JSONArray data;

    /**
     * 变更前数据
     */
    private JSONArray old;

    /**
     * 主键名称
     */
    private JSONArray pkNames;

    /**
     * 执行sql语句
     */
    private String sql;
    
    private Long es;
    private String gtid;
    private Long id;
    private Boolean isDdl;
    private JSONObject mysqlType;
    private JSONObject sqlType;
    private Long ts;

    public <T> List<T> getData(Class<T> clazz) {
        if (this.data == null || this.data.size() == 0) {
            return null;
        }
        return this.data.toJavaList(clazz);
    }

    public <T> List<T> getOld(Class<T> clazz) {
        if (this.old == null || this.old.size() == 0) {
            return null;
        }
        return this.old.toJavaList(clazz);
    }

    public List<String> getPkNames() {
        if (this.pkNames == null || this.pkNames.size() == 0) {
            return null;
        }
        List<String> pkNames = new ArrayList<>();
        for (Object pkName : this.pkNames){
            pkNames.add(pkName.toString());
        }
        return pkNames;
    }

    public Map<String, String> getMysqlType() {
        if(this.mysqlType == null){
            return null;
        }
        Map<String, String> mysqlTypeMap = new HashMap<>();
        this.mysqlType.forEach((k, v) -> {
            mysqlTypeMap.put(k, v.toString());
        });
        return mysqlTypeMap;
    }

    public Map<String, Integer> getSqlType() {
        if(this.sqlType == null){
            return null;
        }
        Map<String, Integer> sqlTypeMap = new HashMap<>();
        this.sqlType.forEach((k, v) -> {
            sqlTypeMap.put(k, Integer.valueOf(v.toString()));
        });
        return sqlTypeMap;
    }
}
  • 操作数据实体类
@Data
public class User implements Serializable {
	private static final long serialVersionUID = 1L;

	/**
	 * ID
	 */
	private Long id;

	/**
	 * 姓名
	 */
	private String name;

	/**
	 * 年龄
	 */
	private Integer age;

	/**
	 * 电话
	 */
	private String phone;

}
  • 监听类CanalListener
import com.alibaba.fastjson.JSON;
import com.example.canalclient.entity.BinLogEntity;
import com.example.canalclient.entity.User;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

/**
 * 监听数据库数据变化时RabbitMQ发送的信息
 */
@Component
public class CanalListener {
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(value = "canal.queue", durable = "true"),
                    exchange = @Exchange(value = "canal.exchange"),
                    key = "canal.routing.key"
            )
    })
	public void handleDataChange(@Payload Message message) {
        // 获取消息内容
        String content = new String(message.getBody(), StandardCharsets.UTF_8);
        // 反序列化
        BinLogEntity binLog = JSON.parseObject(content, BinLogEntity.class);
        // 获取操作数据
        User user = binLog.getData(User.class).get(0);
        User oldUser = binLog.getOld(User.class).get(0);

        System.out.println("数据库:" + binLog.getDatabase());
        System.out.println("表:" + binLog.getTable());
        System.out.println("操作类型:" + binLog.getType());
        System.out.println("主键:" + JSON.toJSONString(binLog.getPkNames()));
        System.out.println("数据:" + JSON.toJSONString(User));
        System.out.println("原数据:" + JSON.toJSONString(User));
        System.out.println("MysqlType:" + JSON.toJSONString(binLog.getMysqlType()));
    }
}
  • 打印结果(修改操作)
数据库:project
表:sys_user
操作类型:UPDATE
主键:["id"]
数据:{
	"id": 1,
	"name": "张三",
	"age": 21,
	"phone": 13333333333
}
原数据:{
	"age": 20,
	"phone": 12222222222
}
MysqlType:{
	"id": "bigint unsigned",
	"name": "varchar(50)",
	"age": "int(3) unsigned",
	"phone": "varchar(50)"
}

至此,已实现对目标数据库实时操作数据进行监听,可根据不同的操作类型,采取相应的业务处理。

七、参考文章

Canal+Msql+RabbitMq数据库同步配置,看这一篇就够了

使用canal同步mysql数据库信息到RabbitMQ

Canal配置connector.subscribe和canal.instance.filter.regex遇到的坑文章来源地址https://www.toymoban.com/news/detail-846547.html

到了这里,关于Canal同步Mysql实时操作日志至RabbitMQ,并实现监听及解析处理的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • canal实时同步mysql数据到elasticsearch(部署,配置,测试)(一)

    canal基于MySQL数据库增量日志解析,提供增量数据订阅和消费,是阿里开源CDC工具,它可以获取MySQL binlog数据并解析,然后将数据变动传输给下游。基于canal,可以实现从MySQL到其他数据库的实时同步 MySQL主备复制原理 MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫

    2023年04月08日
    浏览(34)
  • Canal —— 一款 MySql 实时同步到 ES 的阿里开源神器

    目录 一. 前言 二. Canal 简介和使用场景 2.1. Canal 简介 2.2. Canal 使用场景 三. Canal Server 设计 3.1. 整体设计 3.2. EventParser 设计 3.3. CanalLogPositionManager 设计 3.4. CanalHAController 类图设计 3.5. EventSink 类图设计和扩展 3.6. EventStore 类图设计和扩展 3.7. MetaManager 类图设计和扩展 四. Can

    2024年01月25日
    浏览(40)
  • MySQL如何实时同步数据到ES?试试阿里开源的Canal

    前几天在网上冲浪的时候发现了一个比较成熟的开源中间件——  Canal  。在了解了它的工作原理和使用场景后,顿时产生了浓厚的兴趣。今天,就让我们跟随我的脚步,一起来揭开它神秘的面纱吧。 目录 前言 简介  工作原理  MySQL主备复制原理 canal 工作原理 Canal架构  C

    2024年02月20日
    浏览(33)
  • SpringCloud 整合 Canal+RabbitMQ+Redis 实现数据监听

    Canal 指的是阿里巴巴开源的数据同步工具,用于数据库的实时增量数据订阅和消费。它可以针对 MySQL、MariaDB、Percona、阿里云RDS、Gtid模式下的异构数据同步等情况进行实时增量数据同步。 当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x Canal是如何同步数据库

    2024年02月03日
    浏览(40)
  • 实时同步ES技术选型:Mysql+Canal+Adapter+ES+Kibana

    基于之前的文章,精简操作而来 让ELK在同一个docker网络下通过名字直接访问 Ubuntu服务器ELK部署与实践 使用 Docker 部署 canal 服务实现MySQL和ES实时同步 Docker部署ES服务,canal全量同步的时候内存爆炸,ES/Canal Adapter自动关闭,CPU100% 2.1 新建mysql docker 首先新建数据库的docker镜像

    2024年02月11日
    浏览(32)
  • Springcloud Alibaba 使用Canal将MySql数据实时同步到Elasticsearch

    本篇文章在Springcloud Alibaba使用Canal将Mysql数据实时同步到Redis保证缓存的一致性-CSDN博客 基础上使用canal将mysql数据实时同步到Elasticsearch。 公共包 实体类Sku @Column注解 用来标识实体类中属性与数据表中字段的对应关系 name 定义了被标注字段在数据库表中所对应字段的名称;由

    2024年02月03日
    浏览(30)
  • 基于Canal与Flink实现数据实时增量同步(一)

    vi conf/application.yml server: port: 8089 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 spring.datasource: address: kms-1:3306 database: canal_manager username: canal password: canal driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql:// s p r i n g . d a t a s o u r c e . a d d r e s s / {spring.datasource.address}/ s p r in g . d

    2024年04月13日
    浏览(37)
  • Springcloud Alibaba使用Canal将Mysql数据实时同步到Redis保证缓存的一致性

    目录   1. 背景 2. Windows系统安装canal 3.Mysql准备工作 4. 公共依赖包 5. Redis缓存设计 6. mall-canal-service   canal [kə\\\'næl] ,译意为水道/管道/沟渠,主要用途是 基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费 。其诞生的背景是早期阿里巴巴因为杭州和美国双机房部署,存

    2024年02月03日
    浏览(44)
  • 基于Canal与Flink实现数据实时增量同步(一),计算机毕设源码要提交吗

    配置修改 修改conf/example/instance.properties,修改内容如下: canal.instance.mysql.slaveId = 1234 #position info,需要改成自己的数据库信息 canal.instance.master.address = kms-1.apache.com:3306 #username/password,需要改成自己的数据库信息 canal.instance.dbUsername = canal canal.instance.dbPassword = canal canal.mq.topic

    2024年04月12日
    浏览(43)
  • Canal+Kafka实现Mysql数据同步

    canal [kə\\\'næl] ,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费 canal可以用来监控数据库数据的变化,从而获得新增数据,或者修改的数据。 canal是应阿里巴巴存在杭州和美国的双机房部署,存在跨机房同步的业务需求而提出的。

    2024年02月12日
    浏览(28)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包