Canal实时同步MySQL数据到ES

这篇具有很好参考价值的文章主要介绍了Canal实时同步MySQL数据到ES。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、canal简介

canal主要用途是对MySQL数据库增量日志进行解析,提供增量数据的订阅和消费,简单说就是可以对MySQL的增量数据进行实时同步,支持同步到MySQL、Elasticsearch、HBase等数据存储中去。

早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

二、工作原理

canal会模拟MySQL主库和从库的交互协议,从而伪装成MySQL的从库,然后向MySQL主库发送dump协议,MySQL主库收到dump请求会向canal推送binlog,canal通过解析binlog将数据同步到其他存储中去。

Canal实时同步MySQL数据到ES,Elasticsearch,笔记,软件安装,mysql,elasticsearch

三、实操

3.1 组件下载

  • 首先下载canal的各个组件canal-server、canal-adapter、canal-admin,下载地址:https://github.com/alibaba/canal/releases
    Canal实时同步MySQL数据到ES,Elasticsearch,笔记,软件安装,mysql,elasticsearch

  • canal的各个组件的用途:

    canal-deploy:可以直接监听MySQL的binlog,把自己伪装成MySQL的从库,只负责接收数据,并不做处理。
    canal-adapter:相当于canal的客户端,会从canal-deploy中获取数据,然后对数据进行同步。
    canal-admin:为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面,方便更多用户快速和安全的操作。

  • 版本约束

应用 版本
MySQL 5.7
Elasticsearch 7.6.2
Kibanba 7.6.2
MySQL 5.7
Canal 1.1.5-alpha-2

3.2 MySQL配置

  • 由于canal是通过订阅MySQL的binlog来实现数据同步的,所以需要开启MySQL的binlog写入功能,并设置binlog-format为ROW模式,我的配置文件为 D:\environment\mysql-5.7.9\my.ini,修改如下内容即可;
[mysqld]
## 指定不需要同步的数据库名称
binlog-ignore-db=mysql  
## 开启二进制日志功能
log-bin=D:\environment\mysql-5.7.9\mysql-bin
## 设置二进制日志使用内存大小(事务)
binlog_cache_size=1M  
## 设置使用的二进制日志格式(mixed,statement,row)
binlog_format=row  
## 二进制日志过期清理时间。默认值为0,表示不自动清理。
expire_logs_days=7  
## 跳过主从复制中遇到的所有错误或指定类型的错误,避免slave端复制中断。
## 如:1062错误是指一些主键重复,1032错误是因为主从数据库数据不一致
slave_skip_errors=1062  

  • 配置完成后需要重新启动MySQL,重启成功后通过如下命令查看binlog是否启用;
show variables like '%log_bin%'

Canal实时同步MySQL数据到ES,Elasticsearch,笔记,软件安装,mysql,elasticsearch

  • 再查看下MySQL的binlog模式;
show variables like 'binlog_format%';  

Canal实时同步MySQL数据到ES,Elasticsearch,笔记,软件安装,mysql,elasticsearch

  • 接下来需要创建一个拥有从库权限的账号,用于订阅binlog,这里创建的账号为canal:canal;
CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
  • 创建好测试用的数据库canal-test,之后创建一张商品表product,建表语句如下。
#产品表
DROP TABLE IF EXISTS `product`;
CREATE TABLE `product`  (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `title` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `sub_title` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `price` decimal(10, 2) NULL DEFAULT NULL,
  `pic` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

#用户
DROP TABLE IF EXISTS `users`;
CREATE TABLE `users` (
  `id` varchar(64) NOT NULL,
  `username` varchar(20) NOT NULL COMMENT '用户名',
  `phone` varchar(64) NOT NULL COMMENT '手机号',
  `nickname` varchar(20) NOT NULL COMMENT '昵称',
  `last_modified` timestamp NOT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '最后修改时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `id` (`id`),
  UNIQUE KEY `username` (`username`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

3.3 canal-server使用

  • 解压 canal.deployer-1.1.5-SNAPSHOT.tar.gz,到指定目录canal-server
    Canal实时同步MySQL数据到ES,Elasticsearch,笔记,软件安装,mysql,elasticsearch
  • 修改配置文件conf/example/instance.properties,按如下配置即可,主要是修改数据库相关配置;
# 需要同步数据的MySQL地址
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# 用于同步数据的数据库账号
canal.instance.dbUsername=canal
# 用于同步数据的数据库密码
canal.instance.dbPassword=canal
# 数据库连接编码
canal.instance.connectionCharset = UTF-8
# 需要订阅binlog的表过滤正则表达式
canal.instance.filter.regex=.*\\..*

  • 双击bin/startup.bat脚本启动canal-server服务;

  • 查看服务日志信息;
    Canal实时同步MySQL数据到ES,Elasticsearch,笔记,软件安装,mysql,elasticsearch

3.4 canal-adapter使用

  • 解压 canal.adapter-1.1.5-SNAPSHOT.tar.gz,到指定目录canal-adpter
    Canal实时同步MySQL数据到ES,Elasticsearch,笔记,软件安装,mysql,elasticsearch
  • 修改配置文件conf/application.yml
canal.conf:
  mode: tcp # 客户端的模式,可选tcp kafka rocketMQ
  flatMessage: true # 扁平message开关, 是否以json字符串形式投递数据, 仅在kafka/rocketMQ模式下有效
  zookeeperHosts:    # 对应集群模式下的zk地址
  syncBatchSize: 1000 # 每次同步的批数量
  retries: 0 # 重试次数, -1为无限重试
  timeout: # 同步超时时间, 单位毫秒
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 127.0.0.1:11111 #设置canal-server的地址
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:

  srcDataSources: # 源数据库配置
    defaultDS:
      url: jdbc:mysql://127.0.0.1:3306/canal-test?useUnicode=true
      username: canal
      password: canal
  canalAdapters: # 适配器列表
    - instance: example # canal实例名或者MQ topic名
      groups: # 分组列表
        - groupId: g1 # 分组id, 如果是MQ模式将用到该值
          outerAdapters:
            - name: logger # 日志打印适配器
            - name: es7 # ES同步适配器
              hosts: 127.0.0.1:9200 # ES连接地址
              properties:
                mode: rest # 模式可选transport(9300) 或者 rest(9200)
                # security.auth: test:123456 #  only used for rest mode
                cluster.name: elasticsearch # ES集群名称

  • 新增 conf/es7/product.yml,用于同步数据库product表
dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值
destination: example  # canal的instance或者MQ的topic
groupId: g1 # 对应MQ模式下的groupId, 只会同步对应groupId的数据
esMapping:
  _index: canal_product # es 的索引名称
  _id: _id  # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配
  sql: "SELECT p.id AS _id, p.title, p.sub_title, p.price, p.pic FROM product p"        # sql映射
  etlCondition: "where a.c_time>={}"   #etl的条件参数
  commitBatch: 3000   # 提交批大小

  • 新增 conf/es7/mytest_user.yml,用于同步数据库users表
dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值
destination: example  # canal的instance或者MQ的topic
groupId: g1 # 对应MQ模式下的groupId, 只会同步对应groupId的数据
esMapping:
  _index: mytest_user # es 的索引名称
  _id: _id  # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配
  sql: "SELECT id as _id,username as username,phone as phone,nickname as nickname,last_modified as last_modified FROM users"
  etlCondition: "where a.c_time>={}"   #etl的条件参数
  commitBatch: 3000   # 提交批大小

  • 双击startup.bat脚本启动canal-adapter服务;
  • 查看日志 ,如下启动成功
    Canal实时同步MySQL数据到ES,Elasticsearch,笔记,软件安装,mysql,elasticsearch

四、数据同步演示

4.1 es 新增 mapping

PUT canal_product
{
  "mappings": {
    "properties": {
      "title": {
        "type": "text"
      },
      "sub_title": {
        "type": "text"
      },
      "pic": {
        "type": "text"
      },
      "price": {
        "type": "double"
      }
    }
  }
}


PUT /mytest_user/
{
  "mappings": {
     "properties": {
        "id": {
          "type": "keyword"
        },
        "username": {
          "type": "text"
        },
        "phone": {
          "type": "text"
        },
        "nickname": {
          "type": "text"
        },
        "last_modified":{
          "type": "date"
        }
     }
  }
}

4.2 插入数据

INSERT INTO product ( id, title, sub_title, price, pic ) VALUES ( 5, '荣耀', ' 全面屏游戏智能手机 8GB+64GB', 1999.00, Null);

INSERT INTO `users` VALUES ('1', 'level', '1507407****', '爸爸', '2023-12-13 15:15:35');

4.3 查询es 数据

GET canal_product/_search

Canal实时同步MySQL数据到ES,Elasticsearch,笔记,软件安装,mysql,elasticsearch
Canal实时同步MySQL数据到ES,Elasticsearch,笔记,软件安装,mysql,elasticsearch

五、canal数据同步到ES配置常见报错

canal数据同步到ES配置常见报错文章来源地址https://www.toymoban.com/news/detail-762046.html

到了这里,关于Canal实时同步MySQL数据到ES的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Springcloud Alibaba 使用Canal将MySql数据实时同步到Elasticsearch

    本篇文章在Springcloud Alibaba使用Canal将Mysql数据实时同步到Redis保证缓存的一致性-CSDN博客 基础上使用canal将mysql数据实时同步到Elasticsearch。 公共包 实体类Sku @Column注解 用来标识实体类中属性与数据表中字段的对应关系 name 定义了被标注字段在数据库表中所对应字段的名称;由

    2024年02月03日
    浏览(29)
  • 实时同步ES技术选型:Mysql+Canal+Adapter+ES+Kibana

    基于之前的文章,精简操作而来 让ELK在同一个docker网络下通过名字直接访问 Ubuntu服务器ELK部署与实践 使用 Docker 部署 canal 服务实现MySQL和ES实时同步 Docker部署ES服务,canal全量同步的时候内存爆炸,ES/Canal Adapter自动关闭,CPU100% 2.1 新建mysql docker 首先新建数据库的docker镜像

    2024年02月11日
    浏览(32)
  • Canal —— 一款 MySql 实时同步到 ES 的阿里开源神器

    目录 一. 前言 二. Canal 简介和使用场景 2.1. Canal 简介 2.2. Canal 使用场景 三. Canal Server 设计 3.1. 整体设计 3.2. EventParser 设计 3.3. CanalLogPositionManager 设计 3.4. CanalHAController 类图设计 3.5. EventSink 类图设计和扩展 3.6. EventStore 类图设计和扩展 3.7. MetaManager 类图设计和扩展 四. Can

    2024年01月25日
    浏览(39)
  • 本地部署Canal笔记-实现MySQL与ElasticSearch7数据同步

    本地搭建canal实现mysql数据到es的简单的数据同步,仅供学习参考 建议首先熟悉一下canal同步方式:https://github.com/alibaba/canal/wiki 本地搭建MySQL数据库 本地搭建ElasticSearch 本地搭建canal-server 本地搭建canal-adapter 本地环境为window11,大部分组件采用docker进行部署,MySQL采用8.0.27, 推荐

    2024年02月02日
    浏览(37)
  • 使用 Docker 部署 canal 服务实现MySQL和ES实时同步

    参考 ClientAdapter: Canal的Adapter配置项目 Sync ES:Canal的Adapter中ES同步的配置项 使用 Docker 部署 canal 服务 docker canal-server canal-adapter mysql Canal(基于Docker同步mysql数据到elasticsearch) Canal部署过程中的错误 Canal 1.1.4 Canal Adapter 1.1.4 Kibana: 6.8.8 ElasticSearch: 6.4.3 由于Canal 1.1.4只能适配 Ela

    2024年02月13日
    浏览(36)
  • canal同步mysql数据到es中

    项目中业务数据量比较大,每类业务表都达到千万级别,虽然做了分库分表和读写分离,每张表数据控制在500W一下,但是效率还是达不到要求,为了提高查询效率,我们使用ES查询。 而将mysql实时同步到es中保证数据一致性就成了我们的工作之下。 jdk1.8(依赖jdk环境,需要先

    2023年04月08日
    浏览(37)
  • 利用Canal把MySQL数据同步到ES

    Canal是阿里巴巴开源的一个数据库变更数据同步工具,主要用于 MySQL 数据库的增量数据到下游的同步,例如同步到 Elasticsearch、HBase、Hive 等。下面是一个基本的步骤来导入 MySQL 数据库到 Elasticsearch。 安装和配置 Canal 首先,需要在你的机器上安装并配置Canal。具体步骤可在 C

    2024年02月16日
    浏览(34)
  • 基于Canal同步MySQL数据到Elasticsearch

    基于 canal 同步 mysql 的数据到 elasticsearch 中。 相关软件的安装请参考:《Canal实现数据同步》 1.1 pom依赖 1.2 SimpleCanalClientExample编写 注意当后面 canal-adapter 也连接上 canal-server 后,程序就监听不到数据变化了。 这个类只是测试,下面不使用。 由于目前 canal-adapter 没有官方dock

    2024年02月07日
    浏览(42)
  • 使用canal+rocketmq实现将mysql数据同步到es

    实际开发过程中,经常遇到数据库与缓存不一致的问题,造成这种问题的原因有很多,其中缓存数据没有及时更新、缓存中过期的数据没有及时更新,导致缓存中存在失效数据,导致数据库与缓存不一致。而这种问题的出现大部分都是因为同步延迟、缓存失效、过期和错误使

    2024年02月11日
    浏览(31)
  • docker环境安装mysql、canal、elasticsearch,基于binlog利用canal实现mysql的数据同步到elasticsearch中

    🚀 本文提供的指令完全可以按顺序逐一执行,已进行了多次测试。因此如果你是直接按照我本文写的指令一条条执行的,而非自定义修改过,执行应当是没有任何问题的。 🚀 本文讲述:使用docker环境安装mysql、canal、elasticsearch,基于binlog利用canal实现mysql的数据同步到elas

    2024年02月02日
    浏览(38)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包