Java开发 - Canal的基本用法

这篇具有很好参考价值的文章主要介绍了Java开发 - Canal的基本用法。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言

今天给大家带来的是Canal的基本用法,Canal在Java中常被我们用来做数据的同步,当然不是MySQL与MySQL,Redis与Redis之间了,如果是他们,那就好办了,我们可以直接通过配置来完成他们之间的主从、主主,级联等的同步,为什么要用Canal呢?主要是为了完成MySQL与Redis、MySQL与ES之间的数据同步,其本质是同步的过程中降低代码的耦合度,否则我们完全可以通过代码分别往几种不同的存储方存储数据。

认识Canal

什么是Canal

canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。

下面这张图可以代表Canal的用途,就染我们来一起瞻仰一下:

Java开发 - Canal的基本用法,Java之微服务简单上手系列,canal基础,canal基础配置,canal同步,canal数据同步,canal,原力计划

在看到这张图后,我们要感谢开发者的付出,提供给我们这么好的工具,目前来说,很多公司做数据同步都是采用的这种方式,可以通过Canal分别向MySQL,ES里同步数据。

基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

基本原理

Canal的实现主要利用了MySQL主从复制的原理,细分如下:

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

也就是说,Canal将自己伪装成一个MySQL的从库,像其他的Slava一样,向Master发送dump 协议,MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal ),canal 解析 binary log 对象(原始为 byte 流)。

Canal准备

第一次接触Canal的小伙伴点击下面链接下载Canal:

Releases · alibaba/canal · GitHub

不要使用太新的版本,我们就用1.1.4的版本: 

Java开发 - Canal的基本用法,Java之微服务简单上手系列,canal基础,canal基础配置,canal同步,canal数据同步,canal,原力计划

下载完成之后放在一个英文路径下,我们改下文件夹的名字canal,下有四个文件夹:

Java开发 - Canal的基本用法,Java之微服务简单上手系列,canal基础,canal基础配置,canal同步,canal数据同步,canal,原力计划 

MySQL配置

这里,我们不需要去配置MySQL的主从,如果你想了解,不妨去看这篇博客:

Java开发 - MySQL主从复制初体验

这里有你想要的主从配置,和对主从配置的一些心得体会。

在此处,我们只需要开启一个MySQL服务,设置一个连接的用户和密码,整体上和配置MySQL主从的步骤差不多,因为本质上也是要把Canal配置成MySQL的Slava的。

MySQL服务开启了吧?那么登陆MySQL服务,我们先来创建并授权一个用户.

创建用户:

CREATE USER 'canal'@'%' IDENTIFIED WITH 'mysql_native_password' BY '123456';

mysql8.0和5.x其中一个改动就是加密认证方式发生改变,这个在上面提到的MySQL主从复制里有提到,caching_sha2_password是8.0, mysql_native_password是5.x,canal我们这里都采用mysql_native_password的方式创建密码。

远程授权: 

GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%'  WITH GRANT OPTION;

刷新权限:

FLUSH PRIVILEGES;

修改my.cnf文件,这个根据自己mysql安装位置的路径去找,但似乎这个文件大多情况是不存在的,所以我们直接在etc目录下创建一个用就行,实在害怕,可以运行如下命令查看my.cnf的默认运行位置:

 mysql --help | grep 'my.cnf'

Java开发 - Canal的基本用法,Java之微服务简单上手系列,canal基础,canal基础配置,canal同步,canal数据同步,canal,原力计划 

所以在默认路径下:/usr/local/Cellar/mysql/版本号/ ,此处没有etc文件,自己手动创建吧,不要怂,接着:

Java开发 - Canal的基本用法,Java之微服务简单上手系列,canal基础,canal基础配置,canal同步,canal数据同步,canal,原力计划

进入etc文件,在这里运行:

vim my.cnf

输入:

[mysqld]
# 打开binlog
log-bin=mysql-bin
# 选择ROW(行)模式
binlog-format=ROW
# 不要和canal的slaveId重复即可
server_id=1

 退出并保存,然后重启mysql。

检查mysql的binlog是否开启:

show variables like 'log_bin';

Java开发 - Canal的基本用法,Java之微服务简单上手系列,canal基础,canal基础配置,canal同步,canal数据同步,canal,原力计划 

已开启。

检查binlog_format:

show variables like "%binlog_format%";

Java开发 - Canal的基本用法,Java之微服务简单上手系列,canal基础,canal基础配置,canal同步,canal数据同步,canal,原力计划 

显示ROW,代表我们设置生效。

检查server_id:

show variables like "%server_id%";

Java开发 - Canal的基本用法,Java之微服务简单上手系列,canal基础,canal基础配置,canal同步,canal数据同步,canal,原力计划 

我们设置的1,已生效。 

查看当前正在写入的binlog文件:

show master status;

Java开发 - Canal的基本用法,Java之微服务简单上手系列,canal基础,canal基础配置,canal同步,canal数据同步,canal,原力计划 

我们主要看的就是这两个参数,记住,到此为止,不要再动数据库的任何东西,否则这两个数据会改变,对我们配置canal会有影响。 上面的两个参数,我们在稍后配置canal的时候需要。

额。。。。。不过,这俩参数其实可以不用设置,不设置就代表从最新的地方开始同步,博主已经试过了,没问题。

Canal配置

我们打开刚刚下载的canal文件夹,打开这个路径下的文件:conf/example/instance.properties:

#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0

# enable gtid use true/false
canal.instance.gtidon=false

# position info
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=mysql-bin.000001
canal.instance.master.position=157
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=123456
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################

我们需要改的核心参数暂时不多,如下:

canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=mysql-bin.000001
canal.instance.master.position=157

canal.instance.dbUsername=canal
canal.instance.dbPassword=123456

其他的暂时先不用改,后续将到实际应用的时候会讲,这几个参数不用博主说大家也应该知道什么意思了吧?保存一下。

现在我们来启动canal,canal的启动很简单,打开一个命令行工具,直接把bin/startup.sh文件拖进去回车就可以了,方式不固定:

Java开发 - Canal的基本用法,Java之微服务简单上手系列,canal基础,canal基础配置,canal同步,canal数据同步,canal,原力计划 

命令行输出了一大段内容,但我们不知道canal启动成功了没,我们来看下:

Java开发 - Canal的基本用法,Java之微服务简单上手系列,canal基础,canal基础配置,canal同步,canal数据同步,canal,原力计划 

通过jps可以看到CanalLauncher的进程号,看来应该是没问题的。 

单纯的Canal监听测试

下面我们创建一个最简单的Spring Boot工程,过程就不赘述了:

Java开发 - Canal的基本用法,Java之微服务简单上手系列,canal基础,canal基础配置,canal同步,canal数据同步,canal,原力计划

首先我们引入依赖:

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.4</version>
</dependency>

版本号要和我们使用的一致。  

添加配置:

canal:
  serverAddress: 127.0.0.1
  serverPort: 11111
  instance:
    - example

在CannalClient类使用Spring Bean的生命周期函数afterPropertiesSet(),切记,这里只是监听,并不是真正项目上使用,不要照搬,此处知识单传让大家看到canal监听的效果:

package com.codingfire.canal.Client;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;

import java.net.InetSocketAddress;
import java.util.List;

@Component
public class CanalClient implements InitializingBean {
    private final static int BATCH_SIZE = 1000;

    @Override
    public void afterPropertiesSet() throws Exception {
        // 创建链接
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "example", "", "");
        try {
            //打开连接
            connector.connect();
            //订阅数据库表,全部表
            connector.subscribe(".*\\..*");
            //回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
            connector.rollback();
            while (true) {
                // 获取指定数量的数据
                Message message = connector.getWithoutAck(BATCH_SIZE);
                System.out.println(message.getEntries().size());
                //获取批量ID
                long batchId = message.getId();
                //获取批量的数量
                int size = message.getEntries().size();
                //如果没有数据
                if (batchId == -1 || size == 0) {
                    try {
                        //线程休眠2秒
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    System.out.println("----------------");
                    //如果有数据,处理数据
                    //遍历entries,单条解析
                    for (CanalEntry.Entry entry : message.getEntries()) {
                        //获取表名
                        String tableName = entry.getHeader().getTableName();
                        //获取类型
                        CanalEntry.EntryType entryType = entry.getEntryType();
                        //获取序列化后的数据
                        ByteString storeValue = entry.getStoreValue();
                        //判断entry类型是否为ROWDATA类型
                        if (CanalEntry.EntryType.ROWDATA.equals(entryType)){
                            //反序列化
                            CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
                            //获取当前事件操作类型
                            CanalEntry.EventType eventType = rowChange.getEventType();
                            //获取数据集
                            List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
                            //遍历
                            for (CanalEntry.RowData rowData : rowDatasList) {
                                //改变前数据
                                JSONObject jsonObjectBefore = new JSONObject();
                                List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
                                for (CanalEntry.Column column : beforeColumnsList) {
                                    jsonObjectBefore.put(column.getName(),column.getValue());
                                }
                                //改变后数据
                                JSONObject jsonObjectAfter = new JSONObject();
                                List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                                for (CanalEntry.Column column : afterColumnsList) {
                                    jsonObjectAfter.put(column.getName(),column.getValue());
                                }
                                System.out.println("Table:"+tableName+",EventTpye:"+eventType+",Before:"+jsonObjectBefore+",After:"+jsonObjectAfter);
                            }
                        }else {
                            System.out.println("当前操作类型为:"+entryType);
                        }
                    }
                }
                //进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。
                connector.ack(batchId);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            connector.disconnect();
        }
    }
}

下面,就到了最激动人心的时刻,请运行我们的Spring Boot工程:

Java开发 - Canal的基本用法,Java之微服务简单上手系列,canal基础,canal基础配置,canal同步,canal数据同步,canal,原力计划 

看到这里,就代表启动成功了,下面,我们连接数据库:

mysql -uroot -p123456

随便你是哪个用户连接的都行,没有数据库,你就创建新的数据库,如果已经有了,那么你直接操作里面的数据库表即可,博主目前有一个canal数据库,我们就用这个数据库:

use canal;

博主里面有一张用户表,操作里面的表:

insert into user value(null ,'小明','123456',20,'13812345678');

现在查看控制台有没有监听到数据库变化:

 Java开发 - Canal的基本用法,Java之微服务简单上手系列,canal基础,canal基础配置,canal同步,canal数据同步,canal,原力计划 

可以看到控制台已经打印出了我们刚刚操作的SQL,测试成功。

注意:这里只是监控,并不是真实使用场景,只是让大家直观看到SQL语句被监听到的场景,实际应用中,我们会结合MQ来使用,但不在这篇讲解。 

结语

这篇博客只是canal 的基本配置和监听机制的讲解,旨在帮助大家了解canal的工作方式,在下一篇博客中,我们将结合MQ来做数据的同步,所以大家也不要着急,咱们慢慢来,一步一给脚印,一定要把基础知识学扎实,canal的配置相较于MySQL的主从还是很相似的,也比较简单,主要都是配置项,所以更需要我们细心,不要出错,否则一个参数的错误都是导致系统无法正常运行。好了,咱们下篇再见。文章来源地址https://www.toymoban.com/news/detail-516108.html

到了这里,关于Java开发 - Canal的基本用法的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Android : Room 数据库的基本用法 —简单应用_一_入门

    Android Room 是 Android 官方提供的一个持久性库,用于在 Android 应用程序中管理数据库。它提供了一个简单的 API 层,使得使用 SQLite 数据库变得更加容易和方便。 以下是 Android Room 的主要特点: 对象关系映射 (ORM):Room 允许您将 Java 或 Kotlin 对象映射到数据库表中。您可以定义数

    2024年04月09日
    浏览(87)
  • 瑞芯微RK3588开发板的固件烧录完整教程(简单好上手)

    ​​​​​​​本期技术干货内容分享嵌入式开发板固件烧录教程,以英码嵌入式开发板EVM3588为例,该发板搭载的是瑞芯微RK3588平台,烧录方式采用最常用的USB_OTG烧录,简单又方便! 开发环境 主机:Ubuntu 20.04 开发板:英码科技EVM3588开发板 烧录工具:RKDevTool_Release_v2.92.zi

    2024年02月11日
    浏览(48)
  • Git 管理工具 SourceTree 的使用(上手简单,不熟悉git命令的开发者必用)

    目录 一、SourceTree 概述 二、SourceTree 使用方法 1. 克隆 Git 仓库至本地 2. 推送本地的文件至远程仓库 3. 创建/切换/合并分支 4. 版本回退         SourceTree 是一款免费的 Git 和 Hg 客户端管理工具,支持 Git 项目的创建、克隆、提交、push、pull 和合并等操作。它拥有一个精美简

    2024年02月01日
    浏览(54)
  • rsync基本命令和用法和服务之间进行数据同步

    Rsync 是 Linux 系统下的数据镜像备份工具,使用快速增量备份工具 Remote Sync 可以远程同 步,可以在不同主机之间进行同步,可实现全量备份与增量备份,保持链接和权限,传输前执行压缩,因 此非常适合用于架构集中式备份或异地备份等应用。 官方网站:https://rsync.samba.or

    2024年02月12日
    浏览(55)
  • 微服务概述之微服务特性

    既然系统采用了微服务架构,就需要了解一些微服务的特性,这样在进行微服务开发时,脑海中才会有一些指导方向。微服务具有以下特性。 1. 服务组件化 组件是独立、可替换、可升级的软件的单元。将整体应用拆分成独立的服务组件后,当对单个组件的修改完成后,只需

    2024年01月19日
    浏览(42)
  • 微服务概述之微服务架构

    为了解决单体应用的缺点,工程师们想到将原来大的单体应用进行拆分,化整为零形成独立的应用,不过此时这些应用没有直观的入口,因此用传统应用的概念来定义就不太妥当。于是诞生了“服务”,通过服务来描述这种功能性的应用,并其他应用提供功能支持,服务于其

    2024年01月22日
    浏览(42)
  • java中Scanner的简单用法

    一.用法 1.先导入Java.util.Scanner包 2.创建Scanner类的对象 3.创建一个变量来接收数据 二.输入不同类型数据 1.输入字符串 Java中next()只能获取空格之前的数据 例如: 效果: 为了获得所以数据,我们修改输入数据的分隔符,添加sc.useDelimiter(\\\"n\\\");   效果:    2.输入整数,浮点数

    2024年02月07日
    浏览(53)
  • 03-微服务架构构建之微服务拆分

    微服务架构是将一个单体应用程序拆分为一个个独立且保持松耦合的服务的一种架构方式,每个服务有着独立的数据库并且能独立运行部署。微服务架构的构建过程中,第一步也是最为重要的一步是进行服务拆分。只有将微服务按照合理的方式进行拆分,才能确保整个项目能

    2024年02月03日
    浏览(47)
  • WeUI - 微信官方推出的免费开源 UI 组件库,上手简单,风格简约,在微信生态开发轻量项目的绝佳选择

    微信早年发布的 UI 框架,对想要创建让微信用户感到熟悉的应用来说,是一个好选择。 关于 WeUI WeUI 一款由腾讯微信团队开发的 UI 组件库,是一套同微信原生视觉体验一致的基础样式库,由微信官方设计团队为微信 Web 开发量身设计,这是专门被设计用来构建在微信运行的

    2024年02月12日
    浏览(49)
  • 【AI大模型应用开发】【LangFuse: LangSmith平替,生产级AI应用维护平台】0. 快速上手 - 基本功能全面介绍与实践(附代码)

    大家好,我是同学小张,日常分享AI知识和实战案例 欢迎 点赞 + 关注 👏, 持续学习 , 持续干货输出 。 +v: jasper_8017 一起交流💬,一起进步💪。 微信公众号也可搜【同学小张】 🙏 本站文章一览: 前面我们介绍了LangChain无缝衔接的LangSmith平台,可以跟踪程序运行步骤,提

    2024年03月21日
    浏览(75)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包