Canal1--搭建Canal监听数据库变化

这篇具有很好参考价值的文章主要介绍了Canal1--搭建Canal监听数据库变化。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1.安装mysql

默认安装了mysql(版本8.0.x);

新创建用户

-- 创建用户 用户名:canal 密码:Canal@123456
create user 'canal'@'%' identified by 'Canal@123456';

授权

grant SELECT, REPLICATION SLAVE, REPLICATION CLIENT on *.* to 'canal'@'%' with grant option;

flush privileges;

Canal1--搭建Canal监听数据库变化,数据库

查看MySQL是否开启binlog模式

show variables like 'log_bin';

Canal1--搭建Canal监听数据库变化,数据库
查看当前正在写入的binlog日志:

show master status;

Canal1--搭建Canal监听数据库变化,数据库
记住文件名和偏移量

2.安装Canal

去官网下载页面进行下载;

我这里下载的是1.1.7的版本:
Canal1--搭建Canal监听数据库变化,数据库
解压canal.deployer-1.1.7.tar.gz,我们可以看到里面有五个文件夹:
Canal1--搭建Canal监听数据库变化,数据库
打开配置文件conf/example/instance.properties,配置信息如下:

## mysql serverId , v1.0.26+ will autoGen
## v1.0.26版本后会自动生成slaveId,所以可以不用配置
# canal.instance.mysql.slaveId=0

# 数据库地址
canal.instance.master.address=127.0.0.1:3306
# binlog日志名称
canal.instance.master.journal.name=自己的日志名称
# mysql主库链接时起始的binlog偏移量
canal.instance.master.position=日志的偏移量
# mysql主库链接时起始的binlog的时间戳
canal.instance.master.timestamp=
canal.instance.master.gtid=

# username/password
# 在MySQL服务器授权的账号密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=Canal@123456
# 字符集
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false

# table regex .*\\..*表示监听所有表 也可以写具体的表名,用,隔开
canal.instance.filter.regex=.*\\..*
# mysql 数据解析表的黑名单,多个表用,隔开
canal.instance.filter.black.regex=

开启Canal服务端:
进入bin目录

.\startup.bat

3.Java客户端操作

首先引入maven依赖:

<!--canal客户端-->
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.7</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.protocol</artifactId>
            <version>1.1.7</version>
        </dependency>

然后创建一个CanalClient类

import com.alibaba.otter.canal.client.*;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.*;

import java.net.InetSocketAddress;
import java.util.List;

@Component
public class CanalClient implements InitializingBean {

    private final static int BATCH_SIZE = 1000;

    @Override
    public void afterPropertiesSet() throws Exception {
        // 创建链接
        CanalConnector connector = CanalConnectors.newSingleConnector(
                new InetSocketAddress("127.0.0.1", 11111),
                "example", "", "");
        try {
            //打开连接
            connector.connect();
            //订阅数据库表,全部表
            connector.subscribe(".*\\..*");
            //回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
            connector.rollback();
            while (true) {
                // 获取指定数量的数据
                Message message = connector.getWithoutAck(BATCH_SIZE);
                //获取批量ID
                long batchId = message.getId();
                //获取批量的数量
                int size = message.getEntries().size();
                //如果没有数据
                if (batchId == -1 || size == 0) {
                    try {
                        //线程休眠2秒
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    //如果有数据,处理数据
                    printEntry(message.getEntries());
                }
                //进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。
                connector.ack(batchId);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            connector.disconnect();
        }
    }

    /**
     * 打印canal server解析binlog获得的实体类信息
     */
    private static void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                //开启/关闭事务的实体类型,跳过
                continue;
            }
            //RowChange对象,包含了一行数据变化的所有特征
            //比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等
            RowChange rowChage;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
            }
            //获取操作类型:insert/update/delete类型
            EventType eventType = rowChage.getEventType();
            //打印Header信息
            System.out.println(String.format("================》; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));
            //判断是否是DDL语句
            if (rowChage.getIsDdl()) {
                System.out.println("================》;isDdl: true,sql:" + rowChage.getSql());
            }
            //获取RowChange对象里的每一行数据,打印出来
            for (RowData rowData : rowChage.getRowDatasList()) {
                //如果是删除语句
                if (eventType == EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                    //如果是新增语句
                } else if (eventType == EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                    //如果是更新的语句
                } else {
                    //变更前的数据
                    System.out.println("------->; before");
                    printColumn(rowData.getBeforeColumnsList());
                    //变更后的数据
                    System.out.println("------->; after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private static void printColumn(List<Column> columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }
}

完成之后再对应数据库进行操作,控制台会打印对应的操作,说明对数据的写入操作进行了有效的监控;
注意只读操作并不会写入binlog也不会被Canal监控到(也没必要监控读取操作)。
Canal1--搭建Canal监听数据库变化,数据库文章来源地址https://www.toymoban.com/news/detail-858888.html

到了这里,关于Canal1--搭建Canal监听数据库变化的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • C#实现数据库数据变化监测(sqlserver&mysql)

    监测数据库表数据变化,可实现数据库同步(一主一从(双机备份),一主多从(总部数据库,工厂1,工厂2,工厂数据合并到总部数据)) sqlserver 启用数据库监听服务 ALTER DATABASE test SET NEW_BROKER WITH ROLLBACK IMMEDIATE; ALTER DATABASE test SET ENABLE_BROKER; SELECT is_broker_enabled FROM sys.dat

    2024年02月14日
    浏览(52)
  • Oracle运维(数据库、监听、重启)

    shutdown有四个参数,四个参数的含义如下: Normal 需要等待所有的用户断开连接 Immediate 等待用户完成当前的语句 Transactional 等待用户完成当前的事务 Abort 不做任何等待,直接关闭数据库 normal需要在所有连接用户断开后才执行关闭数据库任务,所以有的时候看起来好象命令没

    2024年02月06日
    浏览(72)
  • 数据库监听器停止与启动

    切换至安装oracle数据库的那个用户,一般为oracle(在root下是安装或是启动不了oracle的); # su oracle 然后启动监听器 # lsnrctl start 会看到启动成功的界面; 停止监听器命令. lsnrctl stop 可以修改oracle的ora文件,对数据库进行配置,在opt/oracle/product/9.2.0/network/admin 目录中,修改相应的ora文件即

    2024年02月07日
    浏览(50)
  • Java 实现实时监听MySQL数据库变更MySQLBinListener

    目录 1、导出需要的类和接口 2、 定义 MySQLBinlogListener类 3、私有方法,启动重连定时器 4、完整代码   编写一个MySQL数据库实时变更的监听器。 为什么要编写这个一个监听器:为了实时监测和响应MySQL数据库中的变更事件 实时数据同步:通过监听MySQL Binlog,可以捕获数据库的

    2024年02月16日
    浏览(54)
  • Linux下Oracle的数据库和监听启动关闭命令

    sqlplus /nolog conn /as sysdba connect sys/123456 as sysdba; (123456为用户密码) startup startup命令它有三种情况: 第一种:不带参数,启动数据库实例并打开数据库,以便用户使用数据库,在多数情况下,使用这种方式! 第二种:带nomount参数,只启动数据库实例,但不打开数据库,在你希

    2024年02月04日
    浏览(84)
  • SpringBoot整合Canal+RabbitMQ监听数据变更

    需求 步骤 环境搭建 整合SpringBoot Canal实现客户端 Canal整合RabbitMQ SpringBoot整合RabbitMQ   我想要在SpringBoot中采用一种与业务代码解耦合的方式,来实现数据的变更记录,记录的内容是新数据,如果是更新操作还得有旧数据内容。 经过调研发现,使用Canal来监听MySQL的binlog变化可

    2024年02月11日
    浏览(46)
  • openGauss学习笔记-200 openGauss 数据库运维-常见故障定位案例-表文件大小无变化

    200.1 VACUUM FULL一张表后,表文件大小无变化 200.1.1 问题现象 使用VACUUM FULL命令对一张表进行清理,清理完成后表大小和清理前一样大。 200.1.2 原因分析 假定该表的名称为table_name,对于该现象可能有以下两种原因: table_name表本身没有delete过数据,使用VACUUM FULL table_name后无需清

    2024年01月18日
    浏览(55)
  • SpringCloud 整合 Canal+RabbitMQ+Redis 实现数据监听

    Canal 指的是阿里巴巴开源的数据同步工具,用于数据库的实时增量数据订阅和消费。它可以针对 MySQL、MariaDB、Percona、阿里云RDS、Gtid模式下的异构数据同步等情况进行实时增量数据同步。 当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x Canal是如何同步数据库

    2024年02月03日
    浏览(51)
  • Sql Server 数据库事务与锁,同一事务更新又查询锁的变化,期望大家来解惑!

    我有一个People表,有三行数据: 如果我们没详细了解数据库事务执行加锁的过程中,会不会有这样一个疑问:如下的这段 SQL 开启了事务,并且在事务中进行了更新和查询操作。 我们知道sql server数据库的默认事务级别是READ COMMITTED(已提交的读取),我们再看一下已提交读事

    2024年02月01日
    浏览(72)
  • Vue组件使用(父组件监听子组件数据变化,子组件使用父组件的数据,并监听父组件的数据变化)

    父组件声明变量 父组件向子组件传递数据 Vue 数据类型 type 有以下几种: String:字符串类型。例如:“hello world”。 Number:数字类型。例如:12,1.5。 Boolean:布尔类型。例如:true,false。 Object:对象类型。例如:{name: ‘Tom’, age: 20}。 Array:数组类型。例如:[1, 2, 3]。 Fun

    2024年02月14日
    浏览(45)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包