基于 Dinky + FlinkSQL + Flink CDC 同步 MySQL 数据到 Elasticsearch、Kafka

这篇具有很好参考价值的文章主要介绍了基于 Dinky + FlinkSQL + Flink CDC 同步 MySQL 数据到 Elasticsearch、Kafka。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、概述

Dinky 是一个开箱即用的一站式实时计算平台以 Apache Flink 为基础,连接 OLAP 和数据湖等众多框架致力于流批一体和湖仓一体的建设与实践。本文以此为FlinkSQL可视化工具。

Flink SQL 使得使用标准 SQL 开发流式应用变得简单,免去代码开发。

Flink CDC 本文使用 MySQL CDC 连接器 允许从 MySQL 数据库读取快照数据和增量数据。

环境及主要软件版本说明

  • kafka_2.12-3.4.0.tgz
  • flink-1.14.6-bin-scala_2.12.tgz
  • flink-sql-connector-mysql-cdc-2.3.0.jar
  • flink-sql-connector-elasticsearch7_2.12-1.14.6.jar
  • flink-sql-connector-kafka_2.12-1.14.6.jar
  • dlink-release-0.7.3.tar.gz
  • Elasticsearch7.x
  • java8
  • MySQL5.7.17
  • kafka-map kafka可视化工具(最新版需要jdk17环境,低版本也需要jdk11)

二、软件安装部署

本文全部采用单机最简环境。

1.Flink

tar -xzf flink-1.14.6-bin-scala_2.12.tgz
cd flink-1.14.6 && ls -l
### 修改配置 flink-1.14.6/conf/flink-conf.yaml,修改成本机的IP、位置
jobmanager.rpc.address: 192.168.xxx.xxx
taskmanager.host: 192.168.xxx.xxx
jobmanager.memory.process.size: 2048m
taskmanager.memory.process.size: 2048m
taskmanager.numberOfTaskSlots: 10
rest.port: 8888
rest.address: 0.0.0.0
web.tmpdir: /home/soft/flink_/flink-1.14.6/tmp
akka.ask.timeout: 600s
### 将flink-sql-connector-kafka_2.12-1.14.6.jar、flink-sql-connector-elasticsearch7_2.12-1.14.6.jar、
### flink-sql-connector-mysql-cdc-2.3.0.jar、放到flink-1.14.6/lib/ 下
### 启动
./start-cluster.sh

启动成功后,即可访问Flink UI 可视化界面 http://192.168.xxx.xxx:8888/

2.Kafka

tar -xzf kafka_2.12-3.4.0.tgz
cd kafka_2.12-3.4.0
bin/kafka-storage.sh random-uuid > uuid
vi config/kraft/server.properties
controller.quorum.voters=1@192.168.XXX.XXX:9093
advertised.listeners=PLAINTEXT://192.168.XXX.XXX:9092
log.dirs=/home/soft/kafka_2.12-3.4.0/kraft-combined-logs
### 使用前面生成的uuid格式化Kafka日志目录
bin/kafka-storage.sh format -t `cat uuid` -c config/kraft/server.properties
Formatting /home/soft/kafka_2.12-3.4.0/kraft-combined-logs with metadata.version 3.4-IV0.

### Start the Kafka Server
bin/kafka-server-start.sh -daemon config/kraft/server.properties
### Kafka 服务器成功启动后,您将拥有一个正在运行并可以使用的基本 Kafka 环境。
jps -l
14085 kafka.Kafka
#### 启动命令说明###############################################################
### 1、前台启动命令:
### bin/kafka-server-start.sh config/server.properties
### 描述:
### 此种方式是窗口运行。一旦窗口关闭或者执行了CTR+C,那么kafka进程就被kill了,kafka服务端就被关闭了
### 2、后台启动命令:
### (1) nohup bin/kafka-server-start.sh config/server.properties 2>&1 &
### (2)bin/kafka-server-start.sh -daemon config/server.properties

3.Dinky

部署参考:http://www.dlink.top/docs/next/deploy_guide/build

  1. 数据库初始化
#登录mysql
mysql -uroot -p123456
#创建数据库
mysql>
create database dinky;
#授权
mysql>
grant all privileges on dinky.* to 'dinky'@'%' identified by 'dinky@123' with grant option;
mysql>
flush privileges;
#此处用 dinky 用户登录
mysql -h fdw1 -udinky -pdinky@123
mysql>
use dinky;
# sql文件在下载的压缩包中获取即可
mysql> source /home/soft/dinky/sql/dinky.sql

2. Dinky部署

tar -zxvf dlink-release-0.7.3.tar.gz
mv dlink-release-0.7.3.tar.gz dinky
#切换目录,修改数据库配置及端口信息
cd /home/soft/dinky/config/
vi application.yml
# -------------------------------------------------------------------------------
spring:
    datasource:
        url: jdbc:mysql://${MYSQL_ADDR:192.168.XXX.XXX:3306}/${MYSQL_DATABASE:dinky}?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTime
zone=Asia/Shanghai&allowPublicKeyRetrieval=true
        username: ${MYSQL_USERNAME:dinky}
        password: ${MYSQL_PASSWORD:dinky@123}
server:
    port: 8899 
# -------------------------------------------------------------------------------
# 加载依赖Dinky 需要具备自身的 Flink 环境,该 Flink 环境的实现需要用户自己在 Dinky 根目录下
# plugins/flink${FLINK_VERSION} 文件夹并上传相关的 Flink 依赖,如 flink-dist, flink-table 等
    cp ${FLINK_HOME}/lib/flink-*.jar  /home/soft/dinky/plugins/flink1.14/
# 包含如下的jar
flink-csv-1.14.6.jar
flink-dist_2.12-1.14.6.jar
flink-json-1.14.6.jar
flink-shaded-zookeeper-3.4.14.jar
flink-sql-connector-elasticsearch7_2.12-1.14.6.jar
flink-sql-connector-kafka_2.12-1.14.6.jar
flink-sql-connector-mysql-cdc-2.3.0.jar
# 完成后启动
sh auto.sh start 1.14

启动完成后访问 http://192.168.XXX.XXX:8899/ 通过admin/admin即可登录进系统进行可视化操作。

三、任务创建提交

场景说明:目前 MySQL 中有四张表,t1、t2、t3、t4。需要将 t1、t2 关联查询后结果同步到 Elasticsearch 索引 search01_index,t3、t4 关联查询后结果也同步到Elasticsearch索引 search01_index。Elasticsearch 索引信息是通过Spring Elasticsearch Data 项目启动时自动生成好了。只需要将数据同步即可。另外推送 Kafka 供给其他源处理。

1. 通过 Flink 自带 sql-client.sh 方式

Flink SQL和数据库字段类型对照说明:https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/jdbc/#data-type-mapping

MySQL 同步到 Elasticsearch

### 进入flink/bin 目录下,通过 ./sql-client.sh 命令启动sql客户端工具                 
Flink SQL> SET execution.checkpointing.interval = 3s;

### source 1
Flink SQL> CREATE TABLE IF NOT EXISTS test1 (
    id STRING,
    name STRING,
    del_flag TINYINT,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '192.168.XXX.XXX',
    'port' = '3306',
    'username' = 'root',
    'password' = '123456',
    'database-name' = 'test1的schame',
    'table-name' = 'test1'
);

### source 2
Flink SQL> CREATE TABLE IF NOT EXISTS test2 (
    id   STRING,
    doc_id STRING,
    event_detail STRING,
    country STRING,
    `time` STRING,
    entity_type STRING,
    entity_name STRING,
    type TINYINT,
    del_flag TINYINT,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '192.168.XXX.XXX',
    'port' = '3306',
    'username' = 'root',
    'password' = '123456',
    'database-name' = 'test2的schame',
    'table-name' = 'test2'
);

### source 3
Flink SQL>CREATE TABLE IF NOT EXISTS test3 (
    id STRING,
    title STRING,
    type TINYINT,
    del_flag TINYINT,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '192.168.XXX.XXX',
    'port' = '3306',
    'username' = 'root',
    'password' = '123456',
    'database-name' = 'test3的schame',
    'table-name' = 'test3'
);

### source 4
Flink SQL>CREATE TABLE IF NOT EXISTS test4 (
     id STRING,
     `time` STRING,
     country STRING,
     entity_type STRING,
     entity_name STRING,
     detail STRING,
     fo_id STRING,
     del_flag TINYINT,
     PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '192.168.XXX.XXX',
    'port' = '3306',
    'username' = 'root',
    'password' = '123456',
    'database-name' = 'test4的schame',
    'table-name' = 'test4'
);

### sink
Flink SQL>CREATE TABLE IF NOT EXISTS search01_index (
    id           STRING,
    name         STRING,
    eventDetail STRING,
    country      STRING,
    `time`       STRING,
    entityType  STRING,
    entityName  STRING,
    type         TINYINT,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'elasticsearch-7',
    'hosts' = 'http://192.168.XXX.XXX:9200',
    'index' = 'search01_index',
    'sink.bulk-flush.max-actions' = '1'
);

### transform 1 ,防止进同一个索引时表 t1 和 t3 的 ID 相同,将 ID 进行拼接处理。
Flink SQL>INSERT INTO search01_index
    SELECT CONCAT('doc_', t1.id, '_', t2.id) AS id,
    t1.name,
    t2.event_detail                   AS eventDetail,
    t2.country,
    t2.`time`,
    t2.entity_type                    AS entityType,
    t2.entity_name                    AS entityName,
    t2.type
    FROM test1 t1
    LEFT JOIN test2 t2 ON t1.id = t2.doc_id
    WHERE t1.del_flag = 0
    AND t2.del_flag = 0;
    
### transform 2 ,防止进同一个索引时表 t1 和 t3 的 ID 相同,将 ID 进行拼接处理。
Flink SQL>INSERT INTO search01_index
    SELECT CONCAT('fo_', t1.id, '_', t2.id) AS id,
    t3.title                         AS name,
    t4.detail                        AS eventDetail,
    t4.country,
    t4.`time`,
    t4.entity_type                   AS entityType,
    t4.entity_name                   AS entityName,
    t3.type
    FROM test3 t3
    LEFT JOIN test4 t4 ON t3.id = t4.fo_id
    WHERE t3.del_flag = 0
    AND t4.del_flag = 0;

执行完成后访问Flink UI http://192.168.XXX.XXX:8888/#/job/running 可以看到正在运行的
Running Jobs,然后对MySQL 表数据进行增删改操作,可以看到 Elasticsearch 中数据同步效果。

MySQL 同步到 Kafka

### 进入flink/bin 目录下,通过 ./sql-client.sh 命令启动sql客户端工具                 
Flink SQL> SET execution.checkpointing.interval = 3s;

### source 1
Flink SQL> CREATE TABLE IF NOT EXISTS test1 (
    id STRING,
    name STRING,
    del_flag TINYINT,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '192.168.XXX.XXX',
    'port' = '3306',
    'username' = 'root',
    'password' = '123456',
    'database-name' = 'test1的schame',
    'table-name' = 'test1'
);

### source 2
Flink SQL> CREATE TABLE IF NOT EXISTS test2 (
    id   STRING,
    doc_id STRING,
    event_detail STRING,
    country STRING,
    `time` STRING,
    entity_type STRING,
    entity_name STRING,
    type TINYINT,
    del_flag TINYINT,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '192.168.XXX.XXX',
    'port' = '3306',
    'username' = 'root',
    'password' = '123456',
    'database-name' = 'test2的schame',
    'table-name' = 'test2'
);

### source 3
Flink SQL>CREATE TABLE IF NOT EXISTS test3 (
    id STRING,
    title STRING,
    type TINYINT,
    del_flag TINYINT,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '192.168.XXX.XXX',
    'port' = '3306',
    'username' = 'root',
    'password' = '123456',
    'database-name' = 'test3的schame',
    'table-name' = 'test3'
);

### source 4
Flink SQL>CREATE TABLE IF NOT EXISTS test4 (
     id STRING,
     `time` STRING,
     country STRING,
     entity_type STRING,
     entity_name STRING,
     detail STRING,
     fo_id STRING,
     del_flag TINYINT,
     PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '192.168.XXX.XXX',
    'port' = '3306',
    'username' = 'root',
    'password' = '123456',
    'database-name' = 'test4的schame',
    'table-name' = 'test4'
);

### sink
Flink SQL>CREATE TABLE IF NOT EXISTS test_kakfa (
    id           STRING,
    name         STRING,
    eventDetail STRING,
    country      STRING,
    `time`       STRING,
    entityType  STRING,
    entityName  STRING,
    type         TINYINT,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'kafka',
    'topic' = 'test_kakfa',
    'properties.bootstrap.servers' = '192.168.XXX.XXX:9092',
    'properties.group.id' = 'testGroup',
    'scan.startup.mode' = 'earliest-offset',
    'format' = 'debezium-json',
    'debezium-json.ignore-parse-errors'='true'
);

### transform 1 ,防止进同一个索引时表 t1 和 t3 的 ID 相同,将 ID 进行拼接处理。
Flink SQL>INSERT INTO test_kakfa
    SELECT CONCAT('doc_', t1.id, '_', t2.id) AS id,
    t1.name,
    t2.event_detail                   AS eventDetail,
    t2.country,
    t2.`time`,
    t2.entity_type                    AS entityType,
    t2.entity_name                    AS entityName,
    t2.type
    FROM test1 t1
    LEFT JOIN test2 t2 ON t1.id = t2.doc_id
    WHERE t1.del_flag = 0
    AND t2.del_flag = 0;
    
### transform 2 ,防止进同一个索引时表 t1 和 t3 的 ID 相同,将 ID 进行拼接处理。
Flink SQL>INSERT INTO test_kakfa
    SELECT CONCAT('fo_', t1.id, '_', t2.id) AS id,
    t3.title                         AS name,
    t4.detail                        AS eventDetail,
    t4.country,
    t4.`time`,
    t4.entity_type                   AS entityType,
    t4.entity_name                   AS entityName,
    t3.type
    FROM test3 t3
    LEFT JOIN test4 t4 ON t3.id = t4.fo_id
    WHERE t3.del_flag = 0
    AND t4.del_flag = 0;

执行完成后访问 Kafka-map UI 可以看到数据已经进入Kafka,数据格式为:

{"before":null,"after":{"id":"fo_48_50","name":"标题啊","eventDetail":"内容2","country":"日本","time":"2023-06-01","entityType":"实体2","entityName":"名称2","type":3},"op":"c"}

2. 通过 Dinky 方式

  • 登录Dinky可视化界面后选择【注册中心】菜单添加flink的环境

mysql 同步到kafka,kafka,mysql,flink

  • 然后选择【数据开发】菜单配置SQL任务

mysql 同步到kafka,kafka,mysql,flink

  • mysql2kafka-1 内容
    CREATE TABLE IF NOT EXISTS test1 (
        id STRING,
        name STRING,
        del_flag TINYINT,
        PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
        'connector' = 'mysql-cdc',
        'hostname' = '192.168.XXX.XXX',
        'port' = '3306',
        'username' = 'root',
        'password' = '123456',
        'database-name' = 'test1的schame',
        'table-name' = 'test1'
    );
     CREATE TABLE IF NOT EXISTS test2 (
        id   STRING,
        doc_id STRING,
        event_detail STRING,
        country STRING,
        `time` STRING,
        entity_type STRING,
        entity_name STRING,
        type TINYINT,
        del_flag TINYINT,
        PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
        'connector' = 'mysql-cdc',
        'hostname' = '192.168.XXX.XXX',
        'port' = '3306',
        'username' = 'root',
        'password' = '123456',
        'database-name' = 'test2的schame',
        'table-name' = 'test2'
    );
    
    CREATE TABLE IF NOT EXISTS test_kakfa (
        id           STRING,
        name         STRING,
        eventDetail STRING,
        country      STRING,
        `time`       STRING,
        entityType  STRING,
        entityName  STRING,
        type         TINYINT,
        PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'test_kakfa',
        'properties.bootstrap.servers' = '192.168.XXX.XXX:9092',
        'properties.group.id' = 'testGroup',
        'scan.startup.mode' = 'earliest-offset',
        'format' = 'debezium-json',
        'debezium-json.ignore-parse-errors'='true'
    );
    INSERT INTO test_kakfa
    SELECT CONCAT('doc_', t1.id, '_', t2.id) AS id,
        t1.name,
        t2.event_detail                   AS eventDetail,
        t2.country,
        t2.`time`,
        t2.entity_type                    AS entityType,
        t2.entity_name                    AS entityName,
        t2.type
    FROM test1 t1
    LEFT JOIN test2 t2 ON t1.id = t2.doc_id
    WHERE t1.del_flag = 0
    AND t2.del_flag = 0;
  • mysql2kafka-2 内容
    CREATE TABLE IF NOT EXISTS test3 (
        id STRING,
        title STRING,
        type TINYINT,
        del_flag TINYINT,
        PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
        'connector' = 'mysql-cdc',
        'hostname' = '192.168.XXX.XXX',
        'port' = '3306',
        'username' = 'root',
        'password' = '123456',
        'database-name' = 'test3的schame',
        'table-name' = 'test3'
    );
    CREATE TABLE IF NOT EXISTS test4 (
         id STRING,
         `time` STRING,
         country STRING,
         entity_type STRING,
         entity_name STRING,
         detail STRING,
         fo_id STRING,
         del_flag TINYINT,
         PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
        'connector' = 'mysql-cdc',
        'hostname' = '192.168.XXX.XXX',
        'port' = '3306',
        'username' = 'root',
        'password' = '123456',
        'database-name' = 'test4的schame',
        'table-name' = 'test4'
    );
    CREATE TABLE IF NOT EXISTS test_kakfa (
        id           STRING,
        name         STRING,
        eventDetail STRING,
        country      STRING,
        `time`       STRING,
        entityType  STRING,
        entityName  STRING,
        type         TINYINT,
        PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'test_kakfa',
        'properties.bootstrap.servers' = '192.168.6.36:9092',
        'properties.group.id' = 'testGroup',
        'scan.startup.mode' = 'earliest-offset',
        'format' = 'debezium-json',
        'debezium-json.ignore-parse-errors'='true'
    );
    INSERT INTO test_kakfa
    SELECT CONCAT('fo_', t1.id, '_', t2.id) AS id,
        t3.title                         AS name,
        t4.detail                        AS eventDetail,
        t4.country,
        t4.`time`,
        t4.entity_type                   AS entityType,
        t4.entity_name                   AS entityName,
        t3.type
    FROM test3 t3
    LEFT JOIN test4 t4 ON t3.id = t4.fo_id
    WHERE t3.del_flag = 0
    AND t4.del_flag = 0;
  • mysql2es-1 内容
    CREATE TABLE IF NOT EXISTS test1 (
        id STRING,
        name STRING,
        del_flag TINYINT,
        PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
        'connector' = 'mysql-cdc',
        'hostname' = '192.168.XXX.XXX',
        'port' = '3306',
        'username' = 'root',
        'password' = '123456',
        'database-name' = 'test1的schame',
        'table-name' = 'test1'
    );
     CREATE TABLE IF NOT EXISTS test2 (
        id   STRING,
        doc_id STRING,
        event_detail STRING,
        country STRING,
        `time` STRING,
        entity_type STRING,
        entity_name STRING,
        type TINYINT,
        del_flag TINYINT,
        PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
        'connector' = 'mysql-cdc',
        'hostname' = '192.168.XXX.XXX',
        'port' = '3306',
        'username' = 'root',
        'password' = '123456',
        'database-name' = 'test2的schame',
        'table-name' = 'test2'
    );
    CREATE TABLE IF NOT EXISTS search01_index (
        id           STRING,
        name         STRING,
        eventDetail STRING,
        country      STRING,
        `time`       STRING,
        entityType  STRING,
        entityName  STRING,
        type         TINYINT,
        PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
        'connector' = 'elasticsearch-7',
        'hosts' = 'http://192.168.XXX.XXX:9200',
        'index' = 'search01_index',
        'sink.bulk-flush.max-actions' = '1'
    );
    INSERT INTO search01_index
    SELECT CONCAT('doc_', t1.id, '_', t2.id) AS id,
        t1.name,
        t2.event_detail                   AS eventDetail,
        t2.country,
        t2.`time`,
        t2.entity_type                    AS entityType,
        t2.entity_name                    AS entityName,
        t2.type
    FROM test1 t1
    LEFT JOIN test2 t2 ON t1.id = t2.doc_id
    WHERE t1.del_flag = 0
    AND t2.del_flag = 0;
  • mysql2es-2 内容
    CREATE TABLE IF NOT EXISTS test3 (
        id STRING,
        title STRING,
        type TINYINT,
        del_flag TINYINT,
        PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
        'connector' = 'mysql-cdc',
        'hostname' = '192.168.XXX.XXX',
        'port' = '3306',
        'username' = 'root',
        'password' = '123456',
        'database-name' = 'test3的schame',
        'table-name' = 'test3'
    );
    CREATE TABLE IF NOT EXISTS test4 (
         id STRING,
         `time` STRING,
         country STRING,
         entity_type STRING,
         entity_name STRING,
         detail STRING,
         fo_id STRING,
         del_flag TINYINT,
         PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
        'connector' = 'mysql-cdc',
        'hostname' = '192.168.XXX.XXX',
        'port' = '3306',
        'username' = 'root',
        'password' = '123456',
        'database-name' = 'test4的schame',
        'table-name' = 'test4'
    );
    CREATE TABLE IF NOT EXISTS search01_index (
        id           STRING,
        name         STRING,
        eventDetail STRING,
        country      STRING,
        `time`       STRING,
        entityType  STRING,
        entityName  STRING,
        type         TINYINT,
        PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
        'connector' = 'elasticsearch-7',
        'hosts' = 'http://192.168.XXX.XXX:9200',
        'index' = 'search01_index',
        'sink.bulk-flush.max-actions' = '1'
    );
    INSERT INTO search01_index
    SELECT CONCAT('fo_', t1.id, '_', t2.id) AS id,
        t3.title                         AS name,
        t4.detail                        AS eventDetail,
        t4.country,
        t4.`time`,
        t4.entity_type                   AS entityType,
        t4.entity_name                   AS entityName,
        t3.type
    FROM test3 t3
    LEFT JOIN test4 t4 ON t3.id = t4.fo_id
    WHERE t3.del_flag = 0
    AND t4.del_flag = 0;
  • 分别点击任务运行后,查看Flink UI

mysql 同步到kafka,kafka,mysql,flink

mysql 同步到kafka,kafka,mysql,flink

  • 查看Kafka-map

mysql 同步到kafka,kafka,mysql,flink

  • 查看Elasticsearch Head 插件

mysql 同步到kafka,kafka,mysql,flink

至此就大功告成了,对于Dinky还不是很熟悉,不知是否还有最优解。欢迎交流。文章来源地址https://www.toymoban.com/news/detail-598994.html

到了这里,关于基于 Dinky + FlinkSQL + Flink CDC 同步 MySQL 数据到 Elasticsearch、Kafka的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 基于 Flink CDC 构建 MySQL 到 Databend 的 实时数据同步

    这篇教程将展示如何基于 Flink CDC 快速构建 MySQL 到 Databend 的实时数据同步。本教程的演示都将在 Flink SQL CLI 中进行,只涉及 SQL,无需一行 Java/Scala 代码,也无需安装 IDE。 假设我们有电子商务业务,商品的数据存储在 MySQL ,我们需要实时把它同步到 Databend 中。 接下来的内容

    2024年02月10日
    浏览(53)
  • 60、Flink CDC 入门介绍及Streaming ELT示例(同步Mysql数据库数据到Elasticsearch)-CDC Connector介绍及示例 (1)

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月19日
    浏览(51)
  • 基于大数据平台(XSailboat)的计算管道实现MySQL数据源的CDC同步--flink CDC

    笔者在先前的一篇文档《数据标签设计 – 大数据平台(XSailboat)的数据标签模块》 提到了关于数据标签的模块,现已实现并应用于项目中。在项目中遇到这样一种情形: 如果打标信息和业务数据是在一个数据库实例中,那么只需要连接两张表进行查询即可。但是数据标签作为

    2024年01月17日
    浏览(67)
  • 最新版Flink CDC MySQL同步Elasticsearch(一)

    首先我们要基于Flink CDC MySQL同步MySQL的环境基础上(flink-1.17.1、Java8、MySQL8)搭建Elasticsearch7-17-10和Kibana 7.17.10。笔者已经搭建好环境,这里不做具体演示了,如果需要Es的搭建教程情况笔者其他博客 注意: 建议生产环境统一使用稳定版本Flink1.16.*。笔者这里只是作为教程编写

    2024年02月13日
    浏览(38)
  • Spring Boot+Flink CDC —— MySQL 同步 Elasticsearch (DataStream方式)

    对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下 授权链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant 基于jdk1.8 + springboot2.7.x + elasticsearch7.x 到此就大功告成啦!代码地址:https://gitee.com/qianxkun/lakudouzi-components/tree/

    2024年02月16日
    浏览(43)
  • Flink CDC 基于mysql binlog 实时同步mysql表

    环境说明: flink 1.15.2 mysql 版本5.7    注意:需要开启binlog,因为增量同步是基于binlog捕获数据 windows11 IDEA 本地运行 先上官网使用说明和案例:MySQL CDC Connector — Flink CDC documentation 1. mysql开启binlog (注意,引擎是 InnoDB,如果是ndbcluster,本人测试是捕获不到binlog日志的,增量相

    2024年02月10日
    浏览(60)
  • Flink CDC 基于mysql binlog 实时同步mysql表(无主键)

    环境说明: flink 1.15.2 mysql 版本5.7    注意:需要开启binlog,因为增量同步是基于binlog捕获数据 windows11 IDEA 本地运行 具体前提设置,请看这篇,包含 binlog 设置、Maven...... Flink CDC 基于mysql binlog 实时同步mysql表_彩虹豆的博客-CSDN博客 经过不懈努力,终于从阿里help页面找到了支

    2024年02月08日
    浏览(47)
  • Flink CDC 基于Oracle log archiving 实时同步Oracle表到Mysql

    环境说明: flink 1.15.2 Oracle 版本:Oracle Database 11g Enterprise Edition Release 11.2.0.1.0 - 64bit Production mysql 版本:5.7 windows11 IDEA 本地运行 先上官网使用说明和案例:Oracle CDC Connector — Flink CDC documentation 1. Oracle 开启 log archiving (1).启用 log archiving         a:以DBA用户连接数据库    

    2024年02月11日
    浏览(47)
  • Flink DataStream API CDC同步MySQL数据到StarRocks

    Flink:1.16.1 pom文件如下 Java代码 SourceAndSinkInfo 类,用于定义source和sink的IP、端口、账号、密码信息 DataCenterShine实体类,字段与数据库一一对应。 StarRocksPrimary 实体类 FieldInfo注解类,用于标记字段序号、是否为主键、是否为空,后续生成TableSchema需要使用到。 TableName 注解类,

    2024年02月03日
    浏览(77)
  • Flink CDC 基于Oracle log archiving 实时同步Oracle表到Mysql(无主键)

    环境说明: flink 1.15.2 Oracle 版本:Oracle Database 11g Enterprise Edition Release 11.2.0.1.0 - 64bit Production mysql 版本:5.7 windows11 IDEA 本地运行 具体环境设置和maven依赖请看上篇:Flink CDC 基于Oracle log archiving 实时同步Oracle表到Mysql_彩虹豆的博客-CSDN博客 现在操作的是源表和目标表都无主键数

    2024年02月15日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包