基于Canal同步MySQL数据到Elasticsearch

这篇具有很好参考价值的文章主要介绍了基于Canal同步MySQL数据到Elasticsearch。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

基于Canal同步MySQL数据到Elasticsearch

基于 canal 同步 mysql 的数据到 elasticsearch 中。

1、canal-server

相关软件的安装请参考:《Canal实现数据同步》

1.1 pom依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>canal-to-elasticsearch</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>canal-to-elasticsearch</name>
    <description>canal to elasticsearch</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.0.RELEASE</version>
        <relativePath/>
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.4</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

1.2 SimpleCanalClientExample编写

package com.example.canatest.config;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;

import java.net.InetSocketAddress;
import java.util.List;

/**
 * 说明:用于测试canal是否已经连接上了mysql
 */
public class SimpleCanalClientExample {
    public static void main(String args[]) {
        // 创建链接
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.94.186",
                11111), "example", "", "");
        int batchSize = 1000;
        int emptyCount = 0;
        try {
            connector.connect();
            connector.subscribe(".*\\..*");
            connector.rollback();
            int totalEmptyCount = 120;
            while (emptyCount < totalEmptyCount) {
                // 获取指定数量的数据
                Message message = connector.getWithoutAck(batchSize);
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    System.out.println("empty count : " + emptyCount);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                } else {
                    emptyCount = 0;
                    // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                    printEntry(message.getEntries());
                }
                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }
            System.out.println("empty too many times, exit");
        } finally {
            connector.disconnect();
        }
    }

    private static void printEntry(List<CanalEntry.Entry> entrys) {
        for (CanalEntry.Entry entry : entrys) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }
            CanalEntry.RowChange rowChage = null;
            try {
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }
            CanalEntry.EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));
            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == CanalEntry.EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == CanalEntry.EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------&gt; before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------&gt; after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private static void printColumn(List<CanalEntry.Column> columns) {
        for (CanalEntry.Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }
}

基于Canal同步MySQL数据到Elasticsearch,elasticsearch,数据库,elasticsearch,数据库

基于Canal同步MySQL数据到Elasticsearch,elasticsearch,数据库,elasticsearch,数据库

注意当后面canal-adapter也连接上canal-server后,程序就监听不到数据变化了。

这个类只是测试,下面不使用。

2、canal-adapter

由于目前canal-adapter没有官方docker镜像,所以拉去一个非官方的。

canal-adapter安装:

搜索镜像

$ docker search canal-adapter

基于Canal同步MySQL数据到Elasticsearch,elasticsearch,数据库,elasticsearch,数据库

拉取镜像

$ docker pull slpcat/canal-adapter:v1.1.5

基于Canal同步MySQL数据到Elasticsearch,elasticsearch,数据库,elasticsearch,数据库

启动

$ docker run -p 8081:8081 --name canal-adapter -d slpcat/canal-adapter:v1.1.5

基于Canal同步MySQL数据到Elasticsearch,elasticsearch,数据库,elasticsearch,数据库

修改配置

$ docker exec -it 89ef714d3a0e /bin/bash
$ cd conf/
$ vi application.yml
server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null
canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    # canal.tcp.server.host需要修改
    canal.tcp.server.host: 192.168.94.186:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
  srcDataSources:
    defaultDS:
      # url,username,password需要修改
      url: jdbc:mysql://192.168.94.186:3306/canal_test?useUnicode=true
      username: canal
      password: canal
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
      # name需要修改
      - name: es7
        # hosts需要修改
        hosts: 192.168.94.186:9200 # 127.0.0.1:9200 for rest mode
        properties:
          mode: rest
          # security.auth: test:123456 #  only used for rest mode
          # cluster.name需要修改
          cluster.name: my-es
$ cd conf/es7
$ cp -v mytest_user.yml canal_test_collect.yml
# 删除其他多余的
$ rm -rf biz_order.yml customer.yml mytest_user.yml
$ vi dailyhub_collect.yml
dataSourceKey: defaultDS
# 需要修改
destination: example
# 需要修改
groupId: g1
esMapping:
  # 需要修改
  _index: canal_test
  _id: _id
  _type: _doc
  upsert: true
#  pk: id
  # 需要修改
  sql: "
SELECT
        c.id AS _id,
        c.user_id AS userId,
        c.title AS title,
        c.url AS url,
        c.note AS note,
        c.collected AS collected,
        c.created AS created,
        c.personal AS personal,
        u.username AS username,
        u.avatar AS userAvatar
FROM
        m_collect c
LEFT JOIN m_user u ON c.user_id = u.id

"
#  objFields:
#    _labels: array:;
#   etlCondition: "where c.c_time>={}"
  commitBatch: 3000

也可以在外面编辑好,通过docker命令传输到docker容器中:

$ docker cp canal_test_collect.yml canal-adapter:/opt/canal-adapter/conf/es7/canal_test_collect.yml
$ docker cp application.yml canal-adapter:/opt/canal-adapter/conf/application.yml

重启容器

$ docker restart 89ef714d3a0e

验证是否启动成功

$ docker logs -f 89ef714d3a0e

基于Canal同步MySQL数据到Elasticsearch,elasticsearch,数据库,elasticsearch,数据库

注意对于时间类型,在后端一定要使用LocalDateTime或者LocalDate类型,如果是Date类型,需要自己手动

设置格式。

3、测试

准备测试条件:

1、首先在数据库中生成表和字段

CREATE TABLE `m_user` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `avatar` varchar(255) DEFAULT NULL,
  `created` date DEFAULT NULL,
  `lasted` date DEFAULT NULL,
  `open_id` varchar(255) DEFAULT NULL,
  `statu` int(11) DEFAULT NULL,
  `username` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4;

CREATE TABLE `m_collect` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `collected` date DEFAULT NULL,
  `created` date DEFAULT NULL,
  `note` varchar(255) DEFAULT NULL,
  `personal` int(11) DEFAULT NULL,
  `title` varchar(255) DEFAULT NULL,
  `url` varchar(255) DEFAULT NULL,
  `user_id` bigint(20) DEFAULT NULL,
  PRIMARY KEY (`id`),
  KEY `FK6yx2mr7fgvv204y8jw5ubsn7h` (`user_id`),
  CONSTRAINT `FK6yx2mr7fgvv204y8jw5ubsn7h` FOREIGN KEY (`user_id`) REFERENCES `m_user` (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=19 DEFAULT CHARSET=utf8mb4;

基于Canal同步MySQL数据到Elasticsearch,elasticsearch,数据库,elasticsearch,数据库

2、然后在elasticsearch中生成索引

# 创建索引并添加映射字段
PUT /canal_test
{
  "mappings": {
    "properties": {
      "collected": {
        "type": "date",
        "format": "date_optional_time||epoch_millis"
      },
      "created": {
        "type": "date",
        "format": "date_optional_time||epoch_millis"
      },
      "note": {
        "type": "text",
        "analyzer": "ik_max_word",
        "search_analyzer": "ik_smart"
      },
      "personal": {
        "type": "integer"
      },
      "title": {
        "type": "text",
        "analyzer": "ik_max_word",
        "search_analyzer": "ik_smart"
      },
      "url": {
        "type": "text"
      },
      "userAvatar": {
        "type": "text"
      },
      "userId": {
        "type": "long"
      },
      "username": {
        "type": "keyword"
      }
    }
  }
}

基于Canal同步MySQL数据到Elasticsearch,elasticsearch,数据库,elasticsearch,数据库

3、插入数据

INSERT INTO `m_user` VALUES ('1', 'https://image-1300566513.cos.ap-guangzhou.myqcloud.com/upload../../images/5a9f48118166308daba8b6da7e466aab.jpg', '2022-01-05', '2022-01-06', 'ozWZ-uAOY2iecT-byynO382u01zg', '0', 'MarkerHub');

基于Canal同步MySQL数据到Elasticsearch,elasticsearch,数据库,elasticsearch,数据库

4、查看数据

GET /canal_test/_search

5、遇到的问题

如果看到canal-adapter一直出现这种异常,说明启动顺序不对,启动顺序应该是:mysqlescanal

adapar文章来源地址https://www.toymoban.com/news/detail-733324.html

2022-01-11 10:43:15.278 [Thread-2] ERROR c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - com.alibaba.otter.canal.protocol.exception.CanalClientException: java.io.IOException: Broken pipe Error sync but ACK!

到了这里,关于基于Canal同步MySQL数据到Elasticsearch的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 本地部署Canal笔记-实现MySQL与ElasticSearch7数据同步

    本地搭建canal实现mysql数据到es的简单的数据同步,仅供学习参考 建议首先熟悉一下canal同步方式:https://github.com/alibaba/canal/wiki 本地搭建MySQL数据库 本地搭建ElasticSearch 本地搭建canal-server 本地搭建canal-adapter 本地环境为window11,大部分组件采用docker进行部署,MySQL采用8.0.27, 推荐

    2024年02月02日
    浏览(37)
  • Springcloud Alibaba 使用Canal将MySql数据实时同步到Elasticsearch

    本篇文章在Springcloud Alibaba使用Canal将Mysql数据实时同步到Redis保证缓存的一致性-CSDN博客 基础上使用canal将mysql数据实时同步到Elasticsearch。 公共包 实体类Sku @Column注解 用来标识实体类中属性与数据表中字段的对应关系 name 定义了被标注字段在数据库表中所对应字段的名称;由

    2024年02月03日
    浏览(29)
  • 基于Canal实现MySQL 8.0 数据库数据同步

    主机名称 操作系统 说明 192.168.11.82 Ubuntu 22.04 主库所在服务器 192.168.11.28 Oracle Linux Server 8.7 从库所在服务器 1、Ubuntu系统下MySQL配置文件位置 2、CentOS系统下MySQL配置文件位置 3、添加如下配置,开启MySQL binlog功能 关于canal简介,这里就不再阐述,具体可以参看官方文档介绍,地

    2023年04月23日
    浏览(57)
  • docker安装canal1.1.5监控mysql的binlog日志并配置rocketmq进行数据同步到elasticsearch(超级大干货)

    1、直接拉取canal镜像 2、创建canal文件夹,用来存在容器挂载到宿主机的目录或文件(注:本实例在/home下操作) 3、先启动canal容器,把需要挂载的目录都copy出来,本例子只挂载了conf和logs目录(自己还想挂载啥东西就进去容器里面看看呗,docker exec -it canal /bin/bash)   4、第

    2024年02月07日
    浏览(39)
  • SpringBoot整合Canal实现数据同步到ElasticSearch

    canal 译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费,canal可以用来监控数据库数据的变化,从而获得新增数据,或者修改的数据。 canal原理就是伪装成mysql的从节点,从而订阅master节点的binlog日志 Canal原理: canal模拟mysql slave的交

    2024年02月06日
    浏览(37)
  • 企业级开发项目实战——基于RabbitMQ实现数据库、elasticsearch的数据同步

    1、商品上架时:search-service新增商品到elasticsearch 2、商品下架时:search-service删除elasticsearch中的商品 数据同步是希望,当我们商品修改了数据库中的商品信息,索引库中的信息也会跟着改。在微服务中数据库和索引库是在两个不同的服务中。如果,商品的服务,向es的服务中

    2024年02月12日
    浏览(45)
  • elasticsearch+canal增量、全量同步

    目录 一、搭建环境: 1.1 下载软件上传到linux目录/data/soft下 1.2  把所有软件解压到/data/es-cluster 二、单节点(多节点同理)集群部署elasticsearch 2.1 创建es用户 2.2 准备节点通讯证书 2.3 配置elasticsearch,编辑/data/es-cluster/elasticsearch-7.9.0-node1/config/elasticsearch.yml文件 2.4 在每一台集群

    2024年01月24日
    浏览(36)
  • 60、Flink CDC 入门介绍及Streaming ELT示例(同步Mysql数据库数据到Elasticsearch)-CDC Connector介绍及示例 (1)

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月19日
    浏览(35)
  • 基于 Dinky + FlinkSQL + Flink CDC 同步 MySQL 数据到 Elasticsearch、Kafka

    Dinky 是一个开箱即用的一站式实时计算平台以 Apache Flink 为基础,连接 OLAP 和数据湖等众多框架致力于流批一体和湖仓一体的建设与实践。本文以此为FlinkSQL可视化工具。 Flink SQL 使得使用标准 SQL 开发流式应用变得简单,免去代码开发。 Flink CDC 本文使用 MySQL CDC 连接器 允许从

    2024年02月16日
    浏览(35)
  • ElasticSearch - 在 微服务项目 中基于 RabbitMQ 实现 ES 和 MySQL 数据异步同步(考点)

    目录 一、数据同步 1.1、什么是数据同步 1.2、解决数据同步面临的问题 1.3、解决办法 1.3.1、同步调用 1.3.2、异步通知(推荐) 1.3.3、监听 binlog 1.3、基于 RabbitMQ 实现数据同步 1.3.1、需求 1.3.2、在“酒店搜索服务”中 声明 exchange、queue、routingKey,同时开启监听 1.3.3、在“酒店

    2024年02月08日
    浏览(38)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包