canal实时同步mysql数据到elasticsearch(部署,配置,测试)(一)

这篇具有很好参考价值的文章主要介绍了canal实时同步mysql数据到elasticsearch(部署,配置,测试)(一)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

简介

canal实时同步mysql数据到elasticsearch(部署,配置,测试)(一)
canal基于MySQL数据库增量日志解析,提供增量数据订阅和消费,是阿里开源CDC工具,它可以获取MySQL binlog数据并解析,然后将数据变动传输给下游。基于canal,可以实现从MySQL到其他数据库的实时同步

工作原理

MySQL主备复制原理

MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

canal 工作原理

canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
canal 解析 binary log 对象(原始为 byte 流)

以上来自canal的github介绍,链接:https://github.com/alibaba/canal

canal 使用流程

canal实时同步mysql数据到elasticsearch(部署,配置,测试)(一)

  1. 部署Deployer服务,该服务负责从上游拉取binlog数据、记录位点等
  2. 部署Client-Adapter服务,该服务负责对接Deployer解析过的数据,并将数据传输到目标库中。
  3. 部署完成后,canal默认会自动同步MySQL增量数据
  4. 如果需要同步MySQL全量数据,请手动调用Client-Adapter服务的方法触发同步任务。
    待全量数据同步完成后,canal会自动开始增量同步。

环境使用版本

需要注意版本对应,canal1.1.6版本需要jdk11,canal1.1.5版本支持jdk8

应用 版本
mysql 8.0.28
elasticsearch 7.9.2
canal 1.1.5
jdk 8

MySQL环境搭建

1.修改mysql配置文件

配置数据库my.cnf文件,如果是windows则配置my.ini文件

#开启二进制日志功能
log-bin=mall-mysql-bin
#设置使用的二进制日志格式(mixed,statement,row)
binlog_format=row

canal实时同步mysql数据到elasticsearch(部署,配置,测试)(一)配置完成重启mysql实例

# 重启mysql命令
systemctl restart mysqld

登录mysql后校验是否开启

# 登录mysql,需要密码输入
mysql -uroot -p
# 查看日志开启的sql
show variables like '%log_bin%'; 
mysql> show variables like '%log_bin%';
+---------------------------------+-------------------------------------+
| Variable_name                   | Value                               |
+---------------------------------+-------------------------------------+
| log_bin                         | ON                                  |
| log_bin_basename                | /var/lib/mysql/mall-mysql-bin       |
| log_bin_index                   | /var/lib/mysql/mall-mysql-bin.index |
| log_bin_trust_function_creators | OFF                                 |
| log_bin_use_v1_row_events       | OFF                                 |
| sql_log_bin                     | ON                                  |
+---------------------------------+-------------------------------------+
6 rows in set (0.12 sec)

查看是否为row模式

mysql> show variables like '%binlog_format%';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW   |
+---------------+-------+
1 row in set (0.02 sec)

2.创建并赋权从库账号

创建从库权限账号canal,用于订阅binlog

#创建用户,密码自己填写,由于创建用户时默认的密码加密方式为caching_sha2_password,所以修改为mysql_native_password,否则服务端启动时可能会报错
create user 'canal'@'%' identified with mysql_native_password by 'Password@123';

# 给新创建账户赋予从库权限
 grant select, replication slave, replication client on *.* to 'canal'@'%';
 
# 刷新权限
flush privileges;

3.创建测试数据库

 CREATE DATABASE IF NOT EXISTS canal default charset utf8 COLLATE utf8_general_ci;

创建测试数据表

CREATE TABLE `test_book` (
  `id` bigint unsigned NOT NULL AUTO_INCREMENT,
  `title` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '题名',
  `isbn` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT 'isbn',
  `author` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '作者',
  `publisher_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '出版社名',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB  DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC

ES环境搭建

1.创建索引

canal实时同步mysql数据到elasticsearch(部署,配置,测试)(一)

2.建立映射

{
  "properties": {
    "id": {
      "type": "long"
    },
    "title": {
      "type": "text"
    },
    "isbn": {
      "type": "text"
    },
    "author": {
      "type": "text"
    },
    "publisherName": {
      "type": "text"
    }
  }
}

canal实时同步mysql数据到elasticsearch(部署,配置,测试)(一)

canal的下载部署

下载canal

下载地址:https://github.com/alibaba/canal/releases
canal实时同步mysql数据到elasticsearch(部署,配置,测试)(一)
下载解压到服务器指定目录,并给文件夹及其子文件赋权
canal实时同步mysql数据到elasticsearch(部署,配置,测试)(一)

chmod -R 755 /home/canal-1.1.5/

配置服务端 canal-deployer

canal-deployer伪装成mysql的从库,监听binlog接收数据,目录结构如下:
canal实时同步mysql数据到elasticsearch(部署,配置,测试)(一)

1.修改配置/conf/canal.properties

#canal的server地址:127.0.0.1,除了ip和port外,其他配置可不改动
canal.ip =127.0.0.1
#canal端口,用于客户端监听
canal.port = 11111

2.修改配置/conf/example/instance.properties

#被同步的mysql地址
canal.instance.master.address=127.0.0.1:3306
#数据库从库权限账号
canal.instance.dbUsername=canal
#数据库从库权限账号的密码
canal.instance.dbPassword=Password@123
#数据库连接编码 
canal.instance.connectionCharset = UTF-8 
#需要订阅binlog的表过滤正则表达式
#canal.instance.filter.regex=.*\\..*
#我们只监听数据同步表
canal.instance.filter.regex=cxstar_oa.data_sync_es
#这里与文件夹名保持一致,后面会用到
canal.mq.topic=example

3.启动canal-deployer

进入bin目录,执行启动命令:

./startup.sh

查看日志:/logs/canal/canal.log

2023-02-02 15:28:16.016 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
2023-02-02 15:28:16.043 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2023-02-02 15:28:16.054 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
2023-02-02 15:28:16.112 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[127.0.0.1(127.0.0.1):11111]
2023-02-02 15:28:17.824 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......

查看日志:/logs/canal/canal.log

2023-02-02 15:28:17.590 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
2023-02-02 15:28:17.619 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$
2023-02-02 15:28:17.619 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$
2023-02-02 15:28:17.757 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
2023-02-02 15:28:17.776 [main] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
2023-02-02 15:28:17.776 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status
2023-02-02 15:28:18.382 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mall-mysql-bin.000008,position=12380,serverId=101,gtid=,timestamp=1675309792000] cost : 610ms , the next step is binlog dump

日志如上就已经成功启动

可能的问题: caching_sha2_password Auth failed
原因:
使用mysql版本为8.0,而创建用户时默认的密码加密方式为caching_sha2_password,所以修改为mysql_native_password

ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY '密码'; #更新一下用户密码
FLUSH PRIVILEGES; #刷新权限

配置客户端canal-adapter

canal-adapter:作为canal的客户端,会从canal-server中获取数据,然后同步数据到MySQL、Elasticsearch等存储中去。目录结构如下:
canal实时同步mysql数据到elasticsearch(部署,配置,测试)(一)

1.替换client-adapter.es7的jar文件

下载v1.1.5-alpha-2,解压后找到plugin目录下的
client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar
canal实时同步mysql数据到elasticsearch(部署,配置,测试)(一)
canal实时同步mysql数据到elasticsearch(部署,配置,测试)(一)
canal实时同步mysql数据到elasticsearch(部署,配置,测试)(一)
增加执行权限

chmod -R 755 /home/canal-1.1.5/canal.adapter-1.1.5/plugin

关于为何这么做的解释,如果不替换,在启动的时候会报错:
java.lang.ClassCastException: com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSource

2.修改配置/conf/application.yml

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null
    
canal.conf:
  mode: tcp # 客户端的模式,可选tcp kafka rocketMQ
  flatMessage: true # 扁平message开关, 是否以json字符串形式投递数据, 仅在kafka/rocketMQ模式下有效
  zookeeperHosts:    # 对应集群模式下的zk地址
  syncBatchSize: 1000 # 每次同步的批数量
  retries: 0 # 重试次数, -1为无限重试
  timeout: # 同步超时时间, 单位毫秒
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 127.0.0.1:11111 #设置canal-server的地址
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
 
  srcDataSources: # 源数据库配置
    defaultDS:
      url: jdbc:mysql://127.0.0.1:3306/canal?useUnicode=true&useSSL=true #测试数据库连接
      username: root #数据库账号
      password: Passwd@2014 #数据库密码
  canalAdapters: # 适配器列表
  - instance: example # canal实例名或者MQ topic名
    groups: # 分组列表
    - groupId: g1 # 分组id, 如果是MQ模式将用到该值
      outerAdapters:
      - name: logger # 日志打印适配器
      - name: es7 # ES同步适配器
        hosts: 192.168.0.182:9200 # ES连接地址
        properties:
          mode: rest # 模式可选transport(9300) 或者 rest(9200)
          #security.auth: elastic:123456 #  连接es的用户和密码,仅rest模式使用
          cluster.name: elasticsearch # ES集群名称, 与es目录下 elasticsearch.yml文件cluster.name对应

3.增加mysql同步es的映射文件

进入/conf/es7目录下,复制mytest_user.yml命名为test_book.yml,同时修改:

dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值
destination: example  # canal的instance或者MQ的topic
groupId: g1 # 对应MQ模式下的groupId, 只会同步对应groupId的数据
esMapping:
  _index: test_book # es 的索引名称
  _id: _id  # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配
  sql: "SELECT
         tb.id AS _id,
         tb.title,
         tb.isbn,
         tb.author,
         tb.publisher_name as publisherName
        FROM
         test_book tb"        # sql映射
  etlCondition: "where p.id>={}"   #etl的条件参数
  commitBatch: 3000   # 提交批大小

4.启动canal-adapter

启动canal-adapter,进入bin目录,执行启动命令:

./startup.sh

日志如下即表示启动成功

2023-04-02 13:22:52.337 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## syncSwitch refreshed.
2023-04-02 13:22:52.337 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## start the canal client adapters.
2023-04-02 13:22:52.338 [main] INFO  c.a.otter.canal.client.adapter.support.ExtensionLoader - extension classpath dir: /home/canal-1.1.5/canal.adapter-1.1.5/plugin
2023-04-02 13:22:52.372 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: logger succeed
2023-04-02 13:22:52.679 [main] INFO  c.a.o.c.client.adapter.es.core.config.ESSyncConfigLoader - ## Start loading es mapping config ... 
2023-04-02 13:22:52.751 [main] INFO  c.a.o.c.client.adapter.es.core.config.ESSyncConfigLoader - ## ES mapping config loaded
2023-04-02 13:22:52.951 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: es7 succeed
2023-04-02 13:22:52.960 [main] INFO  c.alibaba.otter.canal.connector.core.spi.ExtensionLoader - extension classpath dir: /home/canal-1.1.5/canal.adapter-1.1.5/plugin
2023-04-02 13:22:52.982 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Start adapter for canal-client mq topic: cxstar_oa-g1 succeed
2023-04-02 13:22:52.983 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## the canal client adapters are running now ......
2023-04-02 13:22:52.983 [Thread-4] INFO  c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Start to connect destination: cxstar_oa <=============
2023-04-02 13:22:52.990 [main] INFO  org.apache.coyote.http11.Http11NioProtocol - Starting ProtocolHandler ["http-nio-8081"]
2023-04-02 13:22:52.991 [main] INFO  org.apache.tomcat.util.net.NioSelectorPool - Using a shared selector for servlet write/read
2023-04-02 13:22:53.013 [main] INFO  o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat started on port(s): 8081 (http) with context path ''
2023-04-02 13:22:53.028 [main] INFO  c.a.otter.canal.adapter.launcher.CanalAdapterApplication - Started CanalAdapterApplication in 5.259 seconds (JVM running for 6.378)
2023-04-02 13:22:53.081 [Thread-4] INFO  c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Subscribe destination: cxstar_oa succeed <=============


5.canal-adapter启动可能的报错问题

1.com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

[main] ERROR com.alibaba.druid.pool.DruidDataSource - init datasource error, url: jdbc:mysql://127.0.0.1:3306/canal?useUnicode=true&useSSL=true
com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure
The last packet successfully received from the server was 216 milliseconds ago.  The last packet sent successfully to the server was 210 milliseconds ago.

解决方法:/conf/application.yml 中的mysql连接去除&useSSL=true

2.com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSource

ERROR c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: es7 failed
java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassCastException: com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSource
	at com.alibaba.otter.canal.client.adapter.es7x.ES7xAdapter.init(ES7xAdapter.java:54) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]

原因:druid 包冲突
解决方法:
方法1.下载源码包 ,修改client-adapter/escore/pom.xml

<dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <scope>provided</scope>
 </dependency>

打包后将client-adapter/es7x/target/client-adapter.es7x-1.1.5-jar-with-dependencies.jar上传到服务器,替换adataper/plugin下的同名jar文件

方法2.下载v1.1.5-alpha-2,
canal实时同步mysql数据到elasticsearch(部署,配置,测试)(一)
找到plugin目录下的client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jarcanal实时同步mysql数据到elasticsearch(部署,配置,测试)(一)
上传到服务器 canal.adapter-1.1.5/plugin目录下,同时删除client-adapter.es7x-1.1.5-jar-with-dependencies.jar
canal实时同步mysql数据到elasticsearch(部署,配置,测试)(一)
3.Load canal adapter: es7 failed,Name or service not known

 ERROR c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: es7 failed
java.lang.RuntimeException: java.net.UnknownHostException: http: Name or service not known
	at com.alibaba.otter.canal.client.adapter.es7x.ES7xAdapter.init(ES7xAdapter.java:54) ~[client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar:na]

解决方案:/conf/application.yml配置中 ,hosts不要带http://
4.java.lang.NullPointerException: esMapping._type

ERROR c.a.o.c.client.adapter.es.core.monitor.ESConfigMonitor - esMapping._type
java.lang.NullPointerException: esMapping._type
        at com.alibaba.otter.canal.client.adapter.es.core.config.ESSyncConfig.validate(ESSyncConfig.java:35) ~[client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar:na]
        at com.alibaba.otter.canal.client.adapter.es.core.monitor.ESConfigMonitor$FileListener.onFileChange(ESConfigMonitor.java:102) ~[client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar:na]
        at org.apache.commons.io.monitor.FileAlterationObserver.doMatch(FileAlterationObserver.java:400) [commons-io-2.4.jar:2.4]
        at org.apache.commons.io.monitor.FileAlterationObserver.checkAndNotify(FileAlterationObserver.java:334) [commons-io-2.4.jar:2.4]
        at org.apache.commons.io.monitor.FileAlterationObserver.checkAndNotify(FileAlterationObserver.java:304) [commons-io-2.4.jar:2.4]
        at org.apache.commons.io.monitor.FileAlterationMonitor.run(FileAlterationMonitor.java:182) [commons-io-2.4.jar:2.4]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_221]

解决方案:canal.adapter-1.1.5/conf/es7目录下的yml中增加一个官方配置的属性
canal实时同步mysql数据到elasticsearch(部署,配置,测试)(一)

hosts: 192.168.0.182:9200 # ES连接地址

验证canal-adapter是否启动成功
查看日志 canal.adapter-1.1.5/logs/adapter/adapter.log

[org.springframework.cloud.context.properties:name=configurationPropertiesRebinder,context=2b76ff4e,type=ConfigurationPropertiesRebinder]
2023-02-03 09:34:13.373 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## syncSwitch refreshed.
2023-02-03 09:34:13.374 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## start the canal client adapters.
2023-02-03 09:34:13.375 [main] INFO  c.a.otter.canal.client.adapter.support.ExtensionLoader - extension classpath dir: /home/canal/canal-test/canal.adapter-1.1.5/plugin
2023-02-03 09:34:13.418 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: logger succeed
2023-02-03 09:34:13.643 [main] INFO  c.a.o.c.client.adapter.es.core.config.ESSyncConfigLoader - ## Start loading es mapping config ... 
2023-02-03 09:34:13.726 [main] INFO  c.a.o.c.client.adapter.es.core.config.ESSyncConfigLoader - ## ES mapping config loaded
2023-02-03 09:34:13.995 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: es7 succeed
2023-02-03 09:34:14.005 [main] INFO  c.alibaba.otter.canal.connector.core.spi.ExtensionLoader - extension classpath dir: /home/canal/canal-test/canal.adapter-1.1.5/plugin
2023-02-03 09:34:14.029 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Start adapter for canal-client mq topic: example-g1 succeed
2023-02-03 09:34:14.029 [Thread-4] INFO  c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Start to connect destination: example <=============
2023-02-03 09:34:14.029 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## the canal client adapters are running now ......
2023-02-03 09:34:14.037 [main] INFO  org.apache.coyote.http11.Http11NioProtocol - Starting ProtocolHandler ["http-nio-8081"]
2023-02-03 09:34:14.039 [main] INFO  org.apache.tomcat.util.net.NioSelectorPool - Using a shared selector for servlet write/read
2023-02-03 09:34:14.067 [main] INFO  o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat started on port(s): 8081 (http) with context path ''
2023-02-03 09:34:14.080 [main] INFO  c.a.otter.canal.adapter.launcher.CanalAdapterApplication - Started CanalAdapterApplication in 5.221 seconds (JVM running for 5.807)
2023-02-03 09:34:14.169 [Thread-4] INFO  c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Subscribe destination: example succeed <=============

同步测试

建立es索引和mysql表的映射

在客户端目录canal.adapter-1.1.5/conf/es7下配置字段的映射,adapter默认会加载es路径下的所有yml文件。一个配置文件表示一张表的mapping。
建立es和mysql的映射文件test_book.yml

dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值
destination: example  # canal的instance或者MQ的topic
groupId: g1 # 对应MQ模式下的groupId, 只会同步对应groupId的数据
esVersion: es7
esMapping:
  _index: test_book # es 的索引名称
  _id: _id  # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配
  sql: "SELECT
         b.id AS _id, 
         b.title,
         b.author,
         b.isbn,
         b.publisher_name as publisherName
        FROM
         test_book b"        # sql映射
  etlCondition: "where p.id>={}"   #etl的条件参数
  commitBatch: 5000   # 提交批大小

插入mysql数据验证同步

INSERT INTO `canal`.`test_book`( `title`, `isbn`, `author`, `publisher_name`) VALUES (  '三体', '98741254125', '刘慈欣', '工业出版社');

查看日志 canal.adapter-1.1.5/logs/adapter/adapter.log

2023-02-03 10:18:21.988 [pool-2-thread-1] INFO  c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"id":3,"title":"三体","isbn":"98741254125","author":"刘慈欣","publisher_name":"工业出版社"}],"database":"canal","destination":"example","es":1675390701000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"test_book","ts":1675390701977,"type":"INSERT"}
2023-02-03 10:18:22.225 [pool-2-thread-1] DEBUG c.a.o.canal.client.adapter.es.core.service.ESSyncService - DML: {"data":[{"id":3,"title":"三体","isbn":"98741254125","author":"刘慈欣","publisher_name":"工业出版社"}],"database":"canal","destination":"example","es":1675390701000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"test_book","ts":1675390701977,"type":"INSERT"} 
Affected indexes: test_book 

查看es数据,已经成功同步
canal实时同步mysql数据到elasticsearch(部署,配置,测试)(一)

数据同步

canal开启前的数据如何同步

canal-adapter提供一个REST接口可全量同步数据到ES,调用Client-Adapter服务的方法触发同步任务。此时,canal会先中止增量数据传输,然后同步全量数据。待全量数据同步完成后,canal会自动进行增量数据同步。
注意:如果数据是binlog开启前存在,则不可以使用此种方式

curl http://127.0.0.1:8081/etl/es7/test_book.yml -X POST

同步日志:

2023-02-03 10:41:35.043 [http-nio-8081-exec-1] INFO  c.a.otter.canal.client.adapter.es7x.etl.ESEtlService - start etl to import data to index: test_book
2023-02-03 10:41:35.130 [http-nio-8081-exec-1] INFO  c.a.otter.canal.client.adapter.es7x.etl.ESEtlService - 数据全量导入完成, 一共导入 3 条数据, 耗时: 85

binlog未开启前的历史数据如何同步?

因为canal是基于binlog实现全量同步的,那么未开启binlog之前的历史数据就无法被同步,将数据库中的数据导出再重新导入一遍,这样就可以生成binlog

es数组类型同步

adapter配置文件中添加配置

  objFields:
    author: array:, #代表字段以,分割

配置更新后会监听到配置改变,无需重启

2023-02-03 11:33:24.098 [Thread-3] INFO  c.a.o.c.client.adapter.es.core.monitor.ESConfigMonitor - Change a es mapping config: test_book.yml of canal adapter

更新数据,author字段

UPDATE `canal`.`test_book` SET `title` = '三体', `isbn` = '98741254125', `author` = '刘慈欣,刘电工', `publisher_name` = '工业出版社' WHERE `id` = 1;

es中的数据已改变
canal实时同步mysql数据到elasticsearch(部署,配置,测试)(一)

多张表数据同步到一个索引中

yml映射文件中,主表一定要在最左侧,从表的数据改变也会自动同步到es中
示例:journal_volume 表中的数据改变,也会自动同步到journal_paper 表对应的es索引中

SELECT  
    jp.id AS _id, 
    jp.sid AS sid, 
    jp.import_id AS importId, 
    jp.journal_id AS journalId, 
    jp.journal_volume_id AS journalVolumeId, 
    jv.`year` as year,
    jv.volume as volume,
    jv.issue as issue,
    j.publisher_name as publisherName
FROM journal_paper jp 
left join journal_volume jv on jp.journal_volume_id=jv.id
left join journal j on j.id=jp.journal_id

可参考,待亲自实现
https://blog.csdn.net/qq_24950043/article/details/122643889文章来源地址https://www.toymoban.com/news/detail-401140.html

到了这里,关于canal实时同步mysql数据到elasticsearch(部署,配置,测试)(一)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Canal实时同步MySQL数据到ES

    canal主要用途是对MySQL数据库增量日志进行解析,提供增量数据的订阅和消费,简单说就是可以对MySQL的增量数据进行实时同步,支持同步到MySQL、Elasticsearch、HBase等数据存储中去。 早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业

    2024年02月04日
    浏览(48)
  • canal实现mysql数据实时同步到es

    最近有一个需求:原有一些mysql数据,这些数据量很大,且包含文本信息,需要对其进行搜索,这时如果使用mysql的like来匹配,效率会很低,且很可能影响整个系统的运行,经过和同事的讨论,最终决定使用es来做搜索。 但是源数据有很多关联关系,搜索的时候也会带上这些

    2024年02月16日
    浏览(42)
  • docker安装canal1.1.5监控mysql的binlog日志并配置rocketmq进行数据同步到elasticsearch(超级大干货)

    1、直接拉取canal镜像 2、创建canal文件夹,用来存在容器挂载到宿主机的目录或文件(注:本实例在/home下操作) 3、先启动canal容器,把需要挂载的目录都copy出来,本例子只挂载了conf和logs目录(自己还想挂载啥东西就进去容器里面看看呗,docker exec -it canal /bin/bash)   4、第

    2024年02月07日
    浏览(37)
  • MySQL如何实时同步数据到ES?试试阿里开源的Canal

    前几天在网上冲浪的时候发现了一个比较成熟的开源中间件——  Canal  。在了解了它的工作原理和使用场景后,顿时产生了浓厚的兴趣。今天,就让我们跟随我的脚步,一起来揭开它神秘的面纱吧。 目录 前言 简介  工作原理  MySQL主备复制原理 canal 工作原理 Canal架构  C

    2024年02月20日
    浏览(31)
  • 基于Canal同步MySQL数据到Elasticsearch

    基于 canal 同步 mysql 的数据到 elasticsearch 中。 相关软件的安装请参考:《Canal实现数据同步》 1.1 pom依赖 1.2 SimpleCanalClientExample编写 注意当后面 canal-adapter 也连接上 canal-server 后,程序就监听不到数据变化了。 这个类只是测试,下面不使用。 由于目前 canal-adapter 没有官方dock

    2024年02月07日
    浏览(39)
  • docker环境安装mysql、canal、elasticsearch,基于binlog利用canal实现mysql的数据同步到elasticsearch中

    🚀 本文提供的指令完全可以按顺序逐一执行,已进行了多次测试。因此如果你是直接按照我本文写的指令一条条执行的,而非自定义修改过,执行应当是没有任何问题的。 🚀 本文讲述:使用docker环境安装mysql、canal、elasticsearch,基于binlog利用canal实现mysql的数据同步到elas

    2024年02月02日
    浏览(35)
  • docker安装canal入门实战,同步mysql数据到elasticsearch

    官方docker安装说明文档:https://github.com/alibaba/canal/wiki/Docker-QuickStart canal.adapter canal 1.1.1版本之后, 增加客户端数据落地的适配及启动功能, 目前支持功能: 客户端启动器 同步管理REST接口 日志适配器, 作为DEMO 关系型数据库的数据同步(表对表同步), ETL功能 HBase的数据同步(表对表

    2024年02月04日
    浏览(32)
  • Springcloud Alibaba使用Canal将Mysql数据实时同步到Redis保证缓存的一致性

    目录   1. 背景 2. Windows系统安装canal 3.Mysql准备工作 4. 公共依赖包 5. Redis缓存设计 6. mall-canal-service   canal [kə\\\'næl] ,译意为水道/管道/沟渠,主要用途是 基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费 。其诞生的背景是早期阿里巴巴因为杭州和美国双机房部署,存

    2024年02月03日
    浏览(40)
  • Canal —— 一款 MySql 实时同步到 ES 的阿里开源神器

    目录 一. 前言 二. Canal 简介和使用场景 2.1. Canal 简介 2.2. Canal 使用场景 三. Canal Server 设计 3.1. 整体设计 3.2. EventParser 设计 3.3. CanalLogPositionManager 设计 3.4. CanalHAController 类图设计 3.5. EventSink 类图设计和扩展 3.6. EventStore 类图设计和扩展 3.7. MetaManager 类图设计和扩展 四. Can

    2024年01月25日
    浏览(36)
  • 实时同步ES技术选型:Mysql+Canal+Adapter+ES+Kibana

    基于之前的文章,精简操作而来 让ELK在同一个docker网络下通过名字直接访问 Ubuntu服务器ELK部署与实践 使用 Docker 部署 canal 服务实现MySQL和ES实时同步 Docker部署ES服务,canal全量同步的时候内存爆炸,ES/Canal Adapter自动关闭,CPU100% 2.1 新建mysql docker 首先新建数据库的docker镜像

    2024年02月11日
    浏览(31)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包