Doris通过Flink CDC接入MySQL实战

这篇具有很好参考价值的文章主要介绍了Doris通过Flink CDC接入MySQL实战。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1. 创建MySQL库表,写入demo数据

  1. 登录测试MySQL
 mysql -u root -pnew_password
  1. 创建MySQL库表,写入demo数据
CREATE DATABASE emp_1;
 USE emp_1;
CREATE TABLE employees_1 (
    emp_no      INT             NOT NULL,
    birth_date  DATE            NOT NULL,
    first_name  VARCHAR(14)     NOT NULL,
    last_name   VARCHAR(16)     NOT NULL,
    gender      ENUM ('M','F')  NOT NULL,    
    hire_date   DATE            NOT NULL,
    PRIMARY KEY (emp_no)
);

INSERT INTO `employees_1` VALUES (10001,'1953-09-02','Georgi','Facello','M','1986-06-26'),
(10002,'1964-06-02','Bezalel','Simmel','F','1985-11-21'),
(10036,'1959-08-10','Adamantios','Portugali','M','1992-01-03');

注意:MySQL需要开通bin-log

  • log_bin=mysql_bin
  • binlog-format=Row
  • server-id=1

2. 创建Doris库表

  1. 创建Doris表
mysql -uroot -P9030 -h127.0.0.1
create database demo;
use demo;
CREATE TABLE all_employees_info (
    emp_no       int NOT NULL,
    birth_date   date,
    first_name   varchar(20),
    last_name    varchar(20),
    gender       char(2),
    hire_date    date
)
UNIQUE KEY(`emp_no`, `birth_date`)
DISTRIBUTED BY HASH(`birth_date`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);

3. 启动Flink

  1. 启动flink
cd /mnt/apps/flink-1.15.3/ 
#启动flink,这里服务已经启动
bin/start-cluster.sh 
#进入SQL控制台
bin/sql-client.sh embedded
  1. 创建Flink 任务:
SET 'execution.checkpointing.interval' = '10s';

CREATE TABLE employees_source (
    database_name STRING METADATA VIRTUAL,
    table_name STRING METADATA VIRTUAL,
    emp_no int NOT NULL,
    birth_date date,
    first_name STRING,
    last_name STRING,
    gender STRING,
    hire_date date,
    PRIMARY KEY (`emp_no`) NOT ENFORCED
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'localhost',
    'port' = '3306',
    'username' = 'root',
    'password' = 'new_password',
    'database-name' = 'emp_1',
    'table-name' = 'employees_1'
  );

CREATE TABLE cdc_doris_sink (
    emp_no       int ,
    birth_date   STRING,
    first_name   STRING,
    last_name    STRING,
    gender       STRING,
    hire_date    STRING
) 
WITH (
  'connector' = 'doris',
  'fenodes' = '172.16.64.9:8030',
  'table.identifier' = 'demo.all_employees_info',
  'username' = 'root',
  'password' = '',
  'sink.properties.two_phase_commit'='true',
  'sink.label-prefix'='doris_demo_emp_002'
);

insert into cdc_doris_sink (emp_no,birth_date,first_name,last_name,gender,hire_date) 
select emp_no,cast(birth_date as string) as birth_date ,first_name,last_name,gender,cast(hire_date as string) as hire_date  from employees_source;
  1. 输入如下地址,查看flink任务
    http://localhost:8081/#/job/running

  2. 数据验证:启动后可以看到有数据实时进入Doris了

mysql -uroot -P9030 -h127.0.0.1
mysql> select * from all_employees_info;
+--------+------------+------------+-----------+--------+------------+
| emp_no | birth_date | first_name | last_name | gender | hire_date  |
+--------+------------+------------+-----------+--------+------------+
|  10001 | 1953-09-02 | Georgi     | Facello   | M      | 1986-06-26 |
|  10002 | 1964-06-02 | Bezalel    | Simmel    | F      | 1985-11-21 |
|  10036 | 1959-08-10 | Adamantios | Portugali | M      | 1992-01-03 |
|  20001 | 1953-09-02 | Georgi     | Facello   | M      | 1986-06-26 |
+--------+------------+------------+-----------+--------+------------+
4 rows in set (0.02 sec)
Link
  • https://zhuanlan.zhihu.com/p/532913664
  • https://www.runoob.com/mysql/mysql-install.html
  • https://repo.maven.apache.org/maven2/org/apache/doris/flink-doris-connector-1.15/1.2.1/
  • https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.2.1/
Jar包地址:

flink 环境:1.15.3文章来源地址https://www.toymoban.com/news/detail-409930.html

  • https://dlcdn.apache.org/flink/flink-1.15.3/flink-1.15.3-bin-scala_2.12.tgz
    解压并将jar包防止在Flink 的lib下
    flink-doris-connector:1.15
  • https://repo.maven.apache.org/maven2/org/apache/doris/flink-doris-connector-1.15/1.2.1/flink-doris-connector-1.15-1.2.1.jar
    cdc mysql:flink-sql-connector-mysql-cdc-2.2.1.jar
  • https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.2.1/flink-sql-connector-mysql-cdc-2.2.1.jar

到了这里,关于Doris通过Flink CDC接入MySQL实战的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink CDC和Flink SQL构建实时数仓Flink写入Doris

    软件环境 Flink1.13.3 Scala 2.12 doris 0.14 一、MySQL 开启binlog日志、创建用户 1.开启bin log MySQL 8.0默认开启了binlog,可以通过代码show variables like \\\"%log_bin%\\\";查询是否开启了,show variables like \\\"%server_id%\\\";查询服务器ID。 上图分别显示了bin long是否开启以及bin log所在的位置。 2.创建用户 C

    2024年02月02日
    浏览(78)
  • Flink进阶篇-CDC 原理、实践和优化&采集到Doris中

    基于doris官方用doris构建实时仓库的思路,从flinkcdc到doris实时数仓的实践。 原文  Apache Flink X Apache Doris 构建极速易用的实时数仓架构 (qq.com) CDC 是变更数据捕获(Change Data Capture)技术的缩写,它可以将源数据库(Source)的增量变动记录,同步到一个或多个数据目的(Sink)。

    2023年04月08日
    浏览(47)
  • flink cdc同步Oracle数据库资料到Doris问题集锦

    java.lang.NoClassDefFoundError: org/apache/flink/shaded/guava18/com/google/common/util/concurrent/ThreadFactoryBuilder at com.ververica.cdc.debezium.DebeziumSourceFunction.open(DebeziumSourceFunction.java:218) ~[flink-connector-debezium-2.2.0.jar:2.2.0] at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) ~[flink-co

    2024年02月16日
    浏览(47)
  • 【FLINK】Kafka数据源通过Flink-cdc进行实时数据同步

    CDC是Change Data Capture的缩写,中文意思是 变更数据获取 ,flink-cdc的作用是,通过flink捕获数据源的事务变动操作记录,包括数据的增删改操作等,根据这些记录可作用于对目标端进行实时数据同步。 下图是flink-cdc最新支持的数据源类型: kafka的数据源要通过flink-cdc进行实时数

    2024年02月12日
    浏览(58)
  • Flink系列之:Flink CDC深入了解MySQL CDC连接器

    增量快照读取是一种读取表快照的新机制。与旧的快照机制相比,增量快照具有许多优点,包括: (1)在快照读取期间,Source 支持并发读取 (2)在快照读取期间,Source 支持进行 chunk 粒度的 checkpoint (3)在快照读取之前,Source 不需要数据库锁权限。 如果希望 source 并行运

    2024年02月02日
    浏览(50)
  • Flink实战-(6)FlinkSQL实现CDC

    FlinkSQL说明 Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。 自 2015 年开始,阿里巴巴开始调研开源流计算引擎,最终决定基于 Flink 打造新一代计算引擎,针对 Flink 存在的不足进行优化和改进,并且在 2019 年初

    2023年04月26日
    浏览(56)
  • 【实战-01】flink cdc 实时数据同步利器

    cdc github源码地址 cdc官方文档 对很多初入门的人来说是无法理解cdc到底是什么个东西。 有这样一个需求,比如在mysql数据库中存在很多数据,但是公司要把mysql中的数据同步到数据仓库(starrocks), 数据仓库你可以理解为存储了各种各样来自不同数据库中表。 数据的同步目前对

    2023年04月08日
    浏览(56)
  • Flink 内容分享(二十一):通过Flink CDC一键整库同步MongoDB到Paimon

    目录 导言 Paimon CDC Demo 说明 Demo 准备 Demo 开始 总结 MongoDB 是一个比较成熟的文档数据库,在业务场景中,通常需要采集 MongoDB 的数据到数据仓库或数据湖中,面向分析场景使用。 Flink MongoDB CDC 是 Flink CDC 社区提供的一个用于捕获变更数据(Change Data Capturing)的 Flink 连接器,

    2024年01月20日
    浏览(47)
  • Flink CDC 实时mysql到mysql

    CDC 的全称是 Change Data Capture ,在广义的概念上,只要是能捕获数据变更的技术,我们都可以称之为 CDC 。目前通常描述的 CDC 技术主要面向数据库的变更,是一种用于捕获数据库中数据变更的技术。 mysqlcdc需要mysql开启binlog,找到my.cnf,在 [mysqld] 中加入如下信息 [mysqld]

    2024年02月12日
    浏览(47)
  • CDC 整合方案:MySQL > Flink CDC > Kafka > Hudi

    继上一篇 《CDC 整合方案:MySQL > Kafka Connect + Schema Registry + Avro > Kafka > Hudi》 讨论了一种典型的 CDC 集成方案后,本文,我们改用 Flink CDC 完成同样的 CDC 数据入湖任务。与上一个方案有所不同的是:借助现有的 Flink 环境,我们可以直接使用 Flink CDC 从源头数据库接入数据,

    2024年02月22日
    浏览(48)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包