Flink CDC和Flink SQL构建实时数仓Flink写入Doris

这篇具有很好参考价值的文章主要介绍了Flink CDC和Flink SQL构建实时数仓Flink写入Doris。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

软件环境 Flink1.13.3

Scala 2.12

doris 0.14

一、MySQL 开启binlog日志、创建用户

1.开启bin log

MySQL 8.0默认开启了binlog,可以通过代码show variables like "%log_bin%";查询是否开启了,show variables like "%server_id%";查询服务器ID。
flinksql实时数仓项目,flink,sql,java

上图分别显示了bin long是否开启以及bin log所在的位置。

2.创建用户

CREATE USER 'flinktest' IDENTIFIED BY '123456'; 

GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flinktest';

如果遇到报错:

Your password does not satisfy the current policy requirements

Mysql8版本输入

set global validate_password.policy=0; set global validate_password.length=6;

如果是mysql5.6版本 set global validate_password_policy=LOW;set global validate_password_length=6;

二、添加依赖

到仓库服务或者这里下载 cdc依赖flink-connector-mysql-cdc-2.0.2.jar 添加到$FLINK_HOME/lib下面

这里一定要注意一下cdc和flink版本的匹配关系,否则执行SQL的时候会报错:

[ERROR] Could not execute SQL statement. Reason:
java.lang.NoSuchMethodError: org.apache.flink.table.factories.DynamicTableFactory$Context.getCatalogTable()Lorg/apache/flink/table/catalog/ResolvedCatalogTable;

具体如下表:

Flink CDC Connector Version Flink Version
1.0.0 1.11.*
1.1.0 1.11.*
1.2.0 1.12.*
1.3.0 1.12.*
1.4.0 1.13.*
2.0.* 1.13.*

 文章来源地址https://www.toymoban.com/news/detail-786697.html

三、建表

1.MySQL建表:

CREATE TABLE flink_test(id INT ,name VARCHAR(20));

2.Doris建表

2.1启动doris

不懂如何启动可以看这里

2.2Flink连接doris驱动

Flink连接doris需要flink-doris-connector包,如果你懒得编译,可以从这边下载,下面的编译步骤就免了。

驱动编译过程:

首先到Doris官网把整个项目下载下来,然后解压
unzip incubator-doris-master.zip
cd incubator-doris-master/extension/flink-doris-connector
./build.sh  

 如果遇到报错./build.sh: Permission denied  那就修改权限 chmod 777 build.sh

如果遇到报错./build.sh: line 43: mvn: command not found
Error: mvn is not found 那就安装一下maven可以看到这里

等到N久之后,然后你可能遇到报错,无力吐槽啊:

[ERROR] thrift failed output:
[ERROR] thrift failed error: /bin/sh: /opt/pkg/incubator-doris-master/extension/flink-doris-connector/../../thirdparty/installed/bin/thrift: No such file or directory
[INFO] BUILD FAILURE
[ERROR] Failed to execute goal org.apache.thrift.tools:maven-thrift-plugin:0.1.11:compile (thrift-sources) on project doris-flink: thrift did not exit cleanly. Review output for more information. -> [Help 1]

好吧,那就安装thrift咯。安装过程中可能有报C++错误configure: No compiler with C++11 support was found,那就yum install -y gcc gcc-c++安装一下
#版本别太新哈0.93就行,不然可能报错
1.下载
wget http://mirrors.cnnic.cn/apache/thrift/0.9.3/thrift-0.9.3.tar.gz
或者wget http://archive.apache.org/dist/thrift/0.9.3/
2.解压编译
tar -zxf thrift-0.9.3.tar.gz
cd thrift-0.9.3
./configure --with-lua=no && make && make install
3.验证
thrift -version
4.把thrift复制到thirdparty/installed/bin 目录下,目录如果不存在需要手工创建
cp /usr/local/bin/thrift /opt/pkg/incubator-doris-master/thirdparty/installed/bin

又等待N久,继续执行./build.sh

注意,默认flink版本是1.12版本,如果是1.13版本,需要修改incubator-doris-master/extension/flink-doris-connector下面的pom.xml把property修改一下

flinksql实时数仓项目,flink,sql,java

虽然短短几行代码,但是踩坑了不少,等待时间又很久,如果有人不想编译,可以到这边下载我编译好的。注意我这个flink版本是1.13.3,scala版本是2.12哈。

2.3 Doris建表

 mysql -h 172.16.37.29 -P 9030 -uroot

CREATE TABLE test_cnt
(
    id int,
    name varchar(50)
)
DISTRIBUTED BY HASH(id) BUCKETS 10
PROPERTIES("replication_num" = "1",
"in_memory" = "false",
"storage_format" = "V2");

 3.启动flink并建表

3.1启动fink

在$FLINK_HOME/bin目录输入./start-cluster.sh

3.2 flink建表

输入./sql-client.sh embedded启动FLINK SQL客户端,FLINK SQL有表模式,日志变更模式和Tableau模式,本次采用表模式,所以启动之后输入 SET sql-client.execution.result-mode=table;
创建mysql source:
 

 CREATE TABLE flink_mysql_src(
 id INT NOT NULL,
 name STRING
) WITH (
 'connector' = 'mysql-cdc',
 'hostname' = '139.xxx.xx.xx',
 'port' = '3306',
 'username' = 'xxxx',
 'password' = 'xxx',
 'database-name' = 'xx',
 'table-name' = 'flink_test',
 'scan.incremental.snapshot.enabled' = 'false'
);

注意,在flink1.13版本支持根据mysql主键多并发读取数据功能,如果mysql没有设置主键,with里面要加'scan.incremental.snapshot.enabled' = 'false'否则会报错:

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: The primary key is necessary when enable 'Key: 'scan.incremental.snapshot.enabled' , default: true (fallback keys: [])' to 'true'

看一下是否能监控到MySQL数据:
 

在mysql中输入
insert into flink_test values(1,'a');
insert into flink_test values(2,'b');
insert into flink_test values(3,'c');
在flink sql输入:
select * from flink_mysql_src;

可以看到结果已经输出到flink控制台了,说明flink到mysql这端数据传输是OK的

如果遇到报错:ClassNotFoundException: com.ververica.cdc.debezium.DebeziumSourceFunction那就把flink-connector-debezium-2.0.2.jar也放到lib目录下面

创建doris sink:
CREATE TABLE flink_doris_sink (
    id int,
    name string
    ) 
    WITH (
      'connector' = 'doris',
      'fenodes' = 'localhost:8030',
      'table.identifier' = 'zh.test_cnt',
      'sink.batch.size'='2',
      'username' = 'root',
      'password'=''
);

select * from flink_doris_sink看看有没有报错。 

flinksql实时数仓项目,flink,sql,java

 

 

如果报错[ERROR] Could not execute SQL statement. Reason:
java.lang.RuntimeException: can not fetch partitions 说明数据库不存在或者表不存在,注意看建表语句。

如果报错[ERROR] Could not execute SQL statement. Reason: java.lang.ClassNotFoundException: org.apache.doris.flink.table.DorisRowDataInputForma 说明doris-flink-1.0-SNAPSHOT.jar编译有问题,看看自己版本对不对,不对重新改一下pom重新编译

四、实践

1.flink执行任务

INSERT INTO flink_doris_sink
SELECT id,name
FROM flink_mysql_src;
可以到flink网页端看到任务的情况了
flinksql实时数仓项目,flink,sql,java

2.往mysql插入数据

insert into flink_test values(1,'a');
在doris 中查询,发现数据已经过来了

 flinksql实时数仓项目,flink,sql,java

3.变更数据

在mysql中执行update flink_test  set name='tests' where id=1

在doris中查询发现数据已经变更了,不过变成了两条记录,flink_doris_connector暂时不支持删除,据说后续版本会更新,那就期待一下吧。 
flinksql实时数仓项目,flink,sql,java

注意,在flink1.13版本支持根据mysql主键多并发读取数据功能,如果mysql没有设置主键,with里面要加'scan.incremental.snapshot.enabled' = 'false'否则会报错:

 

到了这里,关于Flink CDC和Flink SQL构建实时数仓Flink写入Doris的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 基于 Flink CDC 构建 MySQL 到 Databend 的 实时数据同步

    这篇教程将展示如何基于 Flink CDC 快速构建 MySQL 到 Databend 的实时数据同步。本教程的演示都将在 Flink SQL CLI 中进行,只涉及 SQL,无需一行 Java/Scala 代码,也无需安装 IDE。 假设我们有电子商务业务,商品的数据存储在 MySQL ,我们需要实时把它同步到 Databend 中。 接下来的内容

    2024年02月10日
    浏览(39)
  • NineData:通过一个SQL语句构建实时数仓

    随着企业数据量呈现出爆炸式增长,跨部门、跨应用、跨平台的数据交互需求越来越频繁,传统的数据查询方式已经难以满足这些需求。同时,不同数据库系统之间的数据格式、查询语言等都存在差异,直接进行跨库查询十分困难。 虽然 MySQL、Oracle、PostgreSQL 等数据库系统都

    2024年02月05日
    浏览(38)
  • 如何基于 Apache Doris 与 Apache Flink 快速构建极速易用的实时数仓

    随着大数据应用的不断深入,企业不再满足离线数据加工计算的时效,实时数据需求已成为数据应用新常态。伴随着实时分析需求的不断膨胀,传统的数据架构面临的成本高、实时性无法保证、组件繁冗、运维难度高等问题日益凸显。为了适应业务快速迭代的特点,帮助企业

    2024年02月12日
    浏览(35)
  • Apache Flink X Apache Doris构建极速易用的实时数仓架构

    大家好,我叫王磊。是SelectDB 大数据研发。今天给大家带来的分享是《Apache Flink X Apache Doris构建极速易用的实时数仓架构》。 下面是我们的个人介绍:我是Apache Doris Contributor 和阿里云 MVP。同时著有《 图解 Spark 大数据快速分析实战》等书籍。 接下来咱们进入本次演讲的正题

    2023年04月24日
    浏览(31)
  • 基于Flink CDC实时同步PostgreSQL与Tidb【Flink SQL Client模式下亲测可行,详细教程】

    操作系统:ubuntu-22.04,运行于wsl 2【 注意,请务必使用wsl 2 ;wsl 1会出现各种各样的问题】 软件版本:PostgreSQL 14.9,TiDB v7.3.0,flink 1.7.1,flink cdc 2.4.0 已有postgre的跳过此步 (1)pg安装 https://zhuanlan.zhihu.com/p/143156636 (2)pg配置 可能出现的问题 sudo -u postgres psql 报错: psql: err

    2024年02月11日
    浏览(27)
  • 实时数仓建设第2问:怎样使用flink sql快速无脑统计当天下单各流程(已发货,确认收货等等)状态的订单数量

    实时统计当天下单各流程状态(已支付待卖家发货,卖家通知物流揽收,待买家收货等等)中的订单数量。 订单表的binlog数据发送到kafka,flink从kafka接受消息进行指标统计。因为每笔订单的状态会发生变化,比如上午为【已支付待卖家发货】,这个时候【已支付待卖家发货】指标

    2024年02月16日
    浏览(30)
  • Mysql+ETLCloud CDC+StarRocks实时数仓同步实战

    大型企业需要对各种业务系统中的销售及营销数据进行实时同步分析,例如库存信息、对帐信号、会员信息、广告投放信息,生产进度信息等等,这些统计分析信息可以实时同步到StarRocks中进行分析和统计,StarRocks作为分析型数据库特别适合于对海量数据的存储和分析,我们

    2024年02月16日
    浏览(28)
  • flink-cdc,clickhouse写入,多路输出

    kafka日志数据从kafka读取 1、关联字典表:完善日志数据 2、判断日志内容级别:多路输出 低级:入clickhouse 高级:入clickhouse的同时推送到kafka供2次数据流程处理。

    2024年02月09日
    浏览(32)
  • Flink电商实时数仓(四)

    业务数据:数据都是MySQL中的表格数据, 使用Flink SQL 处理 日志数据:分为page页面日志(页面信息,曝光信息,动作信息,报错信息)和启动日志(启动信息,报错信息),使用Flink Stream API处理 五种日志数据: “start”; 启动信息 “err”; 错误信息 “display”; 曝光信息 “ac

    2024年01月17日
    浏览(38)
  • Flink实时电商数仓(八)

    主要任务:从kafka页面日志主题读取数据,统计 七日回流用户:之前活跃的用户,有一段时间不活跃了,之后又开始活跃,称为回流用户 当日独立用户数:同一个用户当天重复登录,只算作一个独立用户。 读取kafka页面主题数据 转换数据结构: String - JSONObject 过滤数据,u

    2024年02月03日
    浏览(27)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包