SpringBoot快速整合canal1.1.5(TCP模式)

这篇具有很好参考价值的文章主要介绍了SpringBoot快速整合canal1.1.5(TCP模式)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

SpringBoot快速整合canal1.1.5(TCP模式)

安装并配置MySQL主从⭐
  • 1:Docker安装MySQL8.0.28
docker pull mysql:8.0.28
  • 2:创建目录:
mkdir -p /usr/local/mysql8/data
mkdir -p /usr/local/mysql8/log
mkdir -p /usr/local/mysql8/my.conf.d
chmod -R 777 /usr/local/mysql8/
  • 3:编写my.cnf文件:
vi /usr/local/mysql8/my.conf.d/my.cnf

内容如下:(注意:把binlog-do-db的值修改成你需要canal监听的数据库名称,如果需要监听多个数据库,一定要在下面写多个binlog-do-db,而不是用“,”分隔)

[client]
# 默认字符集
# default_character_set=utf8
[mysqld]
server-id=138
# 开启二进制日志功能
log-bin=mysql-slave-bin
# binlog 记录内容的方式,记录被操作的每一行
binlog_format = ROW
# ------- >>>>指定监听的数据库(防止监听所有数据库)<<<<<----------
binlog-do-db=security-jwt-db
# 忽略大小写
lower_case_table_names=1
pid-file= /var/run/mysqld/mysqld.pid
socket= /var/run/mysqld/mysqld.sock
# 数据库数据存放目录
datadir= /var/lib/mysql
secure-file-priv= NULL
skip-symbolic-links=0
# 最大链接数
max_connections=200
# 最大失败次数
max_connect_errors=10
# 默认时区
default-time_zone='+8:00'
character-set-client-handshake=FALSE
character_set_server=utf8mb4
# default-character-set=utf8
collation-server=utf8mb4_unicode_ci
init_connect='SET NAMES utf8mb4 COLLATE utf8mb4_unicode_ci'
# 默认使用‘mysql_native_password’插件认证
default_authentication_plugin=mysql_native_password
  • 4:启动MySQL容器:
docker run \
    --name mysql8.0.28 \
    --privileged=true \
    --restart=always \
    -it -p 3308:3306 \
    -v /usr/local/mysql8/data:/var/lib/mysql \
    -v /usr/local/mysql8/log:/var/log/mysql \
    -v /usr/local/mysql8/my.conf.d/my.cnf:/etc/mysql/my.cnf \
    -e MYSQL_ROOT_PASSWORD=123456 \
    -d mysql:8.0.28
  • 5:查看mysql容器是否启动成功:
[root@centos7-sql my.conf.d]# docker ps
CONTAINER ID   IMAGE          COMMAND                  CREATED          STATUS          PORTS                                                  NAMES
ca41de7447ab   mysql:8.0.28   "docker-entrypoint.s…"   12 seconds ago   Up 11 seconds   33060/tcp, 0.0.0.0:3308->3306/tcp, :::3308->3306/tcp   mysql8.0.28
  • 6:修改密码连接模式(mysql8.0版本都要进行修改):
docker exec -it mysql /bin/bash
mysql -uroot -p'123456'
ALTER USER 'root'@'%' IDENTIFIED WITH mysql_native_password BY '123456';
  • 7:在MySQL中创建一个canal用户,专门用作数据同步:
create user 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal';

GRANT all privileges ON *.* TO 'canal'@'%';

FLUSH PRIVILEGES;
  • 8:退出并重启MySQL容器:
docker restart mysql
docker安装canal1.1.5(版本要对应)⭐
  • 1:拉取canal1.1.5镜像:
docker pull canal/canal-server:v1.1.5
  • 2:运行canal容器:
docker run -p 11111:11111 --name canal \
-e canal.destinations=example \
-e canal.instance.master.address=192.168.184.123:3308  \
-e canal.instance.dbUsername=canal  \
-e canal.instance.dbPassword=canal  \
-e canal.instance.connectionCharset=UTF-8 \
-e canal.instance.tsdb.enable=true \
-e canal.instance.gtidon=false  \
-e canal.instance.filter.regex=security-jwt-db\\..* \
-d canal/canal-server:v1.1.5

核心说明:(只有下面的6个参数才需要我们手动根据自己情况配置,其他可以不用变)

  • -p 11111:11111:这是canal的默认监听端口
  • -e canal.instance.master.address=192.168.184.123:3308数据库地址和端口(一定要设置成你的MySQL对外暴露的IP和端口号才行)
  • -e canal.instance.dbUsername=canal:数据库中canal用户的用户名(也就是我们之前单独创建的用户)
  • -e canal.instance.dbPassword=canal :数据库中canal用户的密码(也就是我们之前单独创建的用户)
  • -e canal.instance.filter.regex=security-jwt-db\\..*:要监听的表名称(我们这个配置的意思是:监听security-jwt-db数据库下的所有表)
  • --network:输入我们刚刚创建好的自定义网络,让canal也加入到和MySQL同一个网络中去。

表名称监听支持的语法:文章来源地址https://www.toymoban.com/news/detail-735809.html

mysql 数据解析关注的表,Perl正则表达式.
多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\) 
常见例子:
1.  所有表:.*   or  .*\\..*
2.  canal schema下所有表: canal\\..*
3.  canal下的以canal打头的表:canal\\.canal.*
4.  canal schema下的一张表:canal.test1
5.  多个规则组合使用然后以逗号隔开:canal\\..*,mysql.test1,mysql.test2 
整合SpringBoot项目⭐
  • 1:导入依赖:(这个依赖并不是alibaba官方的,而是由其他人帮我们整合了SpringBoot项目,我们只需要去导入这个依赖,再做一些简单的配置即可,非常方便!)
    • 注意:canal-spring-boot-starter的1.2.1-RELEASE版本匹配的是canal的1.1.5版本(canal的新版本没有去试过,不知道有没有问题)
<!--        springboot整合canal1.1.5(因为canal-spring-boot-starter的1.2.1-RELEASE版本匹配的是canal的1.1.5版本) -->
        <dependency>
            <groupId>top.javatool</groupId>
            <artifactId>canal-spring-boot-starter</artifactId>
            <version>1.2.1-RELEASE</version>
        </dependency>
  • 2:配置canal:(application.yml)
    • destination:也就是我们安装canal时执行的docker run … canal中配置的-e canal.destinations的参数内容。
    • server:配置canal所在服务器ip+端口号(默认是11111),记得修改成自己的ip地址
#配置alibaba-canal
canal:
  destination: example # canal数据同步的目的地。也就是我们安装canal时配置的example
  server: 192.168.184.123:11111 # canal的地址(canal所在服务器ip+端口号(默认是11111))
# 解决canal-spring-boot-starter一直输出日志
logging:
  level:
    top.javatool.canal.client: warn
  • 3:编写实体类(这个类也就是我们要监听操作的实体类)
@TableName(value="sys_oper_log")
@Data
@AllArgsConstructor
@NoArgsConstructor
@JsonInclude(JsonInclude.Include.NON_NULL)
@Builder
public class OperationLog implements Serializable {

    private static final long serialVersionUID = 1L;


    @TableId("id")
    @JsonSerialize(using = ToStringSerializer.class) //解决雪花算法生成的id过长导致前端js精度丢失问题(也就是js拿到的数据和后端不一致问题)
    @ApiModelProperty(name = "id",value = "主键")
    @ExcelProperty("id")
    private Long id;

    @TableField("username")
    @ApiModelProperty("执行操作的用户名")
    @ExcelProperty("执行操作的用户名")
    private String username;

    @TableField("type")
    @ApiModelProperty("操作类型")
    @ExcelProperty("操作类型")
    private String type;

    @TableField("uri")
    @ApiModelProperty("访问的接口uri")
    @ExcelProperty("访问的接口uri")
    private String uri;

    @TableField("time")
    @ApiModelProperty("访问接口耗时")
    @ExcelProperty("访问接口耗时")
    private String time;

    @TableField("ip")
    @ApiModelProperty("执行操作的用户的ip")
    @ExcelProperty("执行操作的用户的ip")
    private String ip;

    @TableField("address")
    @ApiModelProperty("执行操作的用户的ip对应的地址")
    @ExcelProperty("执行操作的用户的ip对应的地址")
    private String address;

    @TableField("browser")
    @ApiModelProperty("执行操作的用户所使用的浏览器")
    @ExcelProperty("执行操作的用户所使用的浏览器")
    private String browser;

    @TableField("os")
    @ApiModelProperty("执行操作的用户所使用的操作系统")
    @ExcelProperty("执行操作的用户所使用的操作系统")
    private String os;

    @TableField("oper_time")
    @ApiModelProperty("操作时间")
    @ExcelProperty(value = "操作时间",converter = LocalDateTimeConverter.class)
    private LocalDateTime operTime;

    @TableLogic//逻辑删除
    @TableField("del_flag")
    @ApiModelProperty("删除标志(0代表未删除,1代表已删除)")
    @ExcelProperty(value = "删除标志",converter = DelFlagConverter.class)
    private Integer delFlag;

}
  • 4:创建一个中转类(也就是说我们报错的原因是OperationLog类中的LocalDateTime属性,这个时候我们可以创建一个新的类,其他字段不变,把这个LocalDateTime日期类变成String就可以解决这个bug)
    • 原因:(这是由于canal+SpringBoot中的StringConvertUtil类的源码的问题):
      • 可以看到下面的源码没有对LocalDateTime转LocalDatetime进行处理,而是把LocalDateTime转为String类型进行返回(问题出在type.equals(java.sql.Date.class) ? parseDate(columnValue) : columnValue),而columnValue是String类型,所以我们可以用String类型去接收LocalDateTime类型。
package com.boot.entity;

import com.alibaba.excel.annotation.ExcelProperty;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableLogic;
import com.baomidou.mybatisplus.annotation.TableName;
import com.boot.converter.DelFlagConverter;
import com.boot.converter.LocalDateTimeConverter;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.ser.std.ToStringSerializer;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import javax.persistence.Column;
import java.io.Serializable;
import java.time.LocalDateTime;


/**
 * 操作日志canal中转类。解决OperationLog类中的LocalDatetime类型的字段无法被canal接收导致报错
 *
 * @author youzhengjie
 * @date 2022/10/30 21:16:54
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
@JsonInclude(JsonInclude.Include.NON_NULL)
@Builder
public class OperationLogCanal implements Serializable {

    private static final long serialVersionUID = 1L;

    private Long id;

    private String username;

    private String type;

    private String uri;

    private String time;

    private String ip;

    private String address;

    private String browser;

    private String os;

    //canal+springboot当属性名和数据库字段不一致时,要用@Column去指定数据库字段名,否则会接收不到canal数据
    @Column(name = "oper_time")
    private String operTime;

    //canal+springboot当属性名和数据库字段不一致时,要用@Column去指定数据库字段名,否则会接收不到canal数据
    @Column(name = "del_flag")
    private Integer delFlag;

}
  • 5:OperationLogService
package com.boot.service;

import com.baomidou.mybatisplus.extension.service.IService;
import com.boot.entity.OperationLog;

import java.util.List;

/**
 * 操作日志服务
 *
 * @author youzhengjie
 * @date 2022/10/21 23:32:14
 */
public interface OperationLogService extends IService<OperationLog> {

   
    long selectAllOperationLogCount();

    /**
     * 添加操作日志到elasticsearch
     *
     * @param operationLog 操作日志
     * @return boolean
     */
    boolean addOperationLogToEs(OperationLog operationLog);

    /**
     * 根据id删除elasticsearch中的操作日志
     *
     * @param id id
     * @return boolean
     */
    boolean deleteOperationLogToEs(Long id);


    /**
     * 更新elasticsearch中的操作日志
     *
     * @param operationLog 操作日志
     * @return boolean
     */
    boolean updateOperationLogToEs(OperationLog operationLog);

}
  • 6:OperationLogServiceImpl
package com.boot.service.impl;

import cn.hutool.core.bean.BeanUtil;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.boot.entity.OperationLog;
import com.boot.mapper.OperationLogMapper;
import com.boot.service.OperationLogService;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.core.CountRequest;
import org.elasticsearch.client.core.CountResponse;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 操作日志服务impl
 *
 * @author youzhengjie
 * @date 2022/10/21 23:42:49
 */
@Service
@Slf4j
public class OperationLogServiceImpl extends ServiceImpl<OperationLogMapper, OperationLog> implements OperationLogService {

    @Autowired
    private OperationLogMapper operationLogMapper;

    @Autowired
    private RestHighLevelClient restHighLevelClient;

    /**
     * 操作日志的es索引
     */
    private static final String OPER_LOG_INDEX="operation-log-index";

    @Override
    public boolean addOperationLogToEs(OperationLog operationLog) {

        try {
            IndexRequest indexRequest = new IndexRequest(OPER_LOG_INDEX);
            indexRequest.id(operationLog.getId().toString());

            Map<String, Object> sources = new ConcurrentHashMap<>();
            sources.put("username", operationLog.getUsername());
            sources.put("type", operationLog.getType());
            sources.put("uri", operationLog.getUri());
            sources.put("time", operationLog.getTime());
            sources.put("ip", operationLog.getIp());
            sources.put("address",operationLog.getAddress());
            sources.put("browser", operationLog.getBrowser());
            sources.put("os", operationLog.getOs());
            sources.put("operTime", operationLog.getOperTime());
            sources.put("delFlag", operationLog.getDelFlag());

            indexRequest.source(sources);

            restHighLevelClient.index(indexRequest,RequestOptions.DEFAULT);
            return true;
        }catch (Exception e){
            e.printStackTrace();
            return false;
        }

    }

    @Override
    public boolean deleteOperationLogToEs(Long id) {

        try {
            DeleteRequest deleteRequest = new DeleteRequest(OPER_LOG_INDEX);
            deleteRequest.id(id.toString());
            restHighLevelClient.delete(deleteRequest,RequestOptions.DEFAULT);
            return true;
        }catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    @Override
    public boolean updateOperationLogToEs(OperationLog operationLog) {

        try {
            //将operationLog封装成Map
            Map<String,Object> operationLogMap=new ConcurrentHashMap<>();
            //将operationLog拷贝到Map中
            BeanUtil.copyProperties(operationLog,operationLogMap);
            //把map中的id去掉
            operationLogMap.remove("id");

            String idStr = operationLog.getId().toString();
            UpdateRequest updateRequest = new UpdateRequest(OPER_LOG_INDEX,idStr);
            updateRequest.doc(operationLogMap);
            restHighLevelClient.update(updateRequest,RequestOptions.DEFAULT);
            return true;
        }catch (Exception e){
            e.printStackTrace();
            return false;
        }

    }

}
  • 7:创建一个canal监听类(非常核心),指定监听一个表(这里我监听的是sys_oper_log表):
package com.boot.canal;

import cn.hutool.core.bean.BeanUtil;
import com.boot.entity.OperationLog;
import com.boot.entity.OperationLogCanal;
import com.boot.service.OperationLogService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import top.javatool.canal.client.annotation.CanalTable;
import top.javatool.canal.client.handler.EntryHandler;

/**
 * 操作日志canal处理器
 *
 * @author youzhengjie
 * @date 2022/10/30 15:19:09
 */
@CanalTable("sys_oper_log") //@CanalTable("sys_oper_log"):指定canal监听的表名为sys_oper_log
@Component
@Slf4j
public class OperationLogCanalHandle implements EntryHandler<OperationLogCanal> {

    @Autowired
    private OperationLogService operationLogService;

    @Override
    public void insert(OperationLogCanal operationLogCanal) {

        //编写mysql和缓存同步的逻辑(例如JVM本地缓存、Redis分布式缓存、es等)
        OperationLog operationLog = new OperationLog();
        //bean拷贝
        BeanUtil.copyProperties(operationLogCanal,operationLog);
        //同步到es中
        operationLogService.addOperationLogToEs(operationLog);
        log.warn("OperationLogCanalHandle->insert->开始同步->"+operationLog);

    }

    /**
     * 更新
     *
     * @param before 之前
     * @param after  之后
     */
    @Override
    public void update(OperationLogCanal before, OperationLogCanal after) {

        //编写mysql和缓存同步的逻辑(例如JVM本地缓存、Redis分布式缓存、es等)
        OperationLog operationLog = new OperationLog();
        //注意:要拷贝after对象,这个对象是修改之后的对象
        BeanUtil.copyProperties(after,operationLog);
        //同步es
        operationLogService.updateOperationLogToEs(operationLog);
        log.warn("OperationLogCanalHandle->update->开始同步->"+operationLog);

    }

    @Override
    public void delete(OperationLogCanal operationLogCanal) {
        //编写mysql和缓存同步的逻辑(例如JVM本地缓存、Redis分布式缓存、es等)
        Long id = operationLogCanal.getId();
        //同步es
        operationLogService.deleteOperationLogToEs(id);
        log.warn("OperationLogCanalHandle->delete->开始同步->"+id);
    }
}

到了这里,关于SpringBoot快速整合canal1.1.5(TCP模式)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • SpringBoot整合Canal实现数据同步到ElasticSearch

    canal 译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费,canal可以用来监控数据库数据的变化,从而获得新增数据,或者修改的数据。 canal原理就是伪装成mysql的从节点,从而订阅master节点的binlog日志 Canal原理: canal模拟mysql slave的交

    2024年02月06日
    浏览(37)
  • Spring Boot整合canal实现数据一致性解决方案解析-部署+实战

    🏷️ 个人主页 :牵着猫散步的鼠鼠  🏷️ 系列专栏 :Java全栈-专栏 🏷️ 个人学习笔记,若有缺误,欢迎评论区指正   1.前言 2.canal部署安装 3.Spring Boot整合canal 3.1数据库与缓存一致性问题概述 3.2 整合canel 4.总结 canal [kə\\\'næl]  ,译意为水道/管道/沟渠,主要用途是 基于

    2024年03月19日
    浏览(43)
  • spring boot +springboot集成es7.9.1+canal同步到es

    未经许可,请勿转载。 其实大部分的代码是来源于 参考资料来源 的 主要代码实现 ,我只是在他的基础上增加自定义注解,自定义分词器等。需要看详细源码的可以去看 主要代码实现 ,结合我的来使用。 有人会问为什么需要自定义注解,因为elasticsearch7.6 索引将去除type 没

    2023年04月11日
    浏览(74)
  • Spring 整合 Mybatis -- Spring快速入门保姆级教程(四)

    为了巩固所学的知识,作者尝试着开始发布一些学习笔记类的博客,方便日后回顾。当然,如果能帮到一些萌新进行新技术的学习那也是极好的。作者菜菜一枚,文章中如果有记录错误,欢迎读者朋友们批评指正。 (博客的参考源码可以在我主页的资源里找到,如果在学习的

    2024年02月07日
    浏览(50)
  • 【Java】SpringBoot快速整合Redis

            文末有源码gitee地址         【面试】浅学Redis_redis 广播-CSDN博客         Redis是一种 高性能开源的基于内存的,采用键值对存储的非关系型数据库 ,它支持多种数据结构,包括字符串、哈希表、列表、集合、有序集合等。Redis的特点之一是 数据存储在内存

    2024年01月19日
    浏览(35)
  • SpringBoot快速整合RabbitMq小案例

    对于一个直接创建的springBoot项目工程来说,可以按照以下步骤使用rabbitmq 添加依赖:添加rabbitMQ的依赖。 配置连接:在配置文件中配置虚拟主机、端口号、用户名、密码等信息。 创建生产者:导入对应依赖后,使用rabbitTemplate,并调用convertAndSend来发送消息。 来发送消息。

    2024年02月09日
    浏览(33)
  • springboot整合mybatis代码快速生成

    特别说明:本次项目整合基于idea进行的,如果使用Eclipse可能操作会略有不同,不过总的来说不影响。 springboot整合之如何选择版本及项目搭建 springboot整合之版本号统一管理  springboot整合mybatis-plus+durid数据库连接池 springboot整合swagger springboot整合mybatis代码快速生成 springboot整

    2024年02月02日
    浏览(27)
  • SpringSecurity框架快速搭建(SpringBoot整合Security)

    目录 Common类 Config类 CorsConfig(解决跨域问题) RedisConfig (Redis数据库配置) Spring Security (配置安全功能的类) expression类(Expression 类通常用于权限控制和安全策略的定义) SGExpressionRoot(判断用户是否具有某个权限) Filter类 JwtAuthenticationTokenFilter(解析token看是否放行) Handler类

    2024年02月09日
    浏览(34)
  • springboot快速整合腾讯云COS对象存储

    1、导入相关依赖 2、编写配置类,获取配置信息 创建配置类主要需要以下信息 腾讯云账号秘钥 和 密码秘钥: 用于创建COSClient链接对象,识别用户身份信息 存储桶区域 :需要设置客户端所属区域Region 存储桶名称 :创建请求时,需要告知上传到哪个存储桶下 存储桶访问路径

    2024年02月15日
    浏览(38)
  • SpringBoot整合第三方技术 -- SpringBoot快速入门保姆级教程(三)

    为了巩固所学的知识,作者尝试着开始发布一些学习笔记类的博客,方便日后回顾。当然,如果能帮到一些萌新进行新技术的学习那也是极好的。作者菜菜一枚,文章中如果有记录错误,欢迎读者朋友们批评指正。 (博客的参考源码可以在我主页的资源里找到,如果在学习的

    2024年02月09日
    浏览(33)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包