使用flink的sql-client.sh,测试mysql-->kafka-->kafka-->mysql实时流

这篇具有很好参考价值的文章主要介绍了使用flink的sql-client.sh,测试mysql-->kafka-->kafka-->mysql实时流。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

1. 环境介绍

2. mysql建表

3. flinksql建表

3.1 进入flinksql客户端 

​3.2 配置输出格式

​3.3 flink建表

3.4 任务流配置

4. 测试

4.1 插入测试数据

4.2 查看结果表数据​

4.3 新增测试数据

4.4 再次查看结果表数据


1. 环境介绍

服务 版本
zookeeper 3.8.0
kafka 3.3.1
flink 1.13.5
mysql 5.7.34
jdk 1.8
scala 2.12

连接器 作用
flink-sql-connector-upsert-kafka_2.11-1.13.6.jar 连接kafka,支持主键更新
flink-connector-mysql-cdc-2.0.2.jar 读mysql
flink-connector-jdbc_2.11-1.13.6.jar 写mysql
mysql-connector-java-5.1.37.jar 连接mysql

2. mysql中建表

CREATE TABLE src_mysql_order(
 order_id BIGINT,
 store_id BIGINT,
 sales_amt double,
 PRIMARY KEY (`order_id`)
);

CREATE TABLE src_mysql_order_detail(
 order_id BIGINT,
 store_id BIGINT,
 goods_id BIGINT,
 sales_amt double,
 PRIMARY KEY (order_id,store_id,goods_id)
);

CREATE TABLE dim_store(
 store_id BIGINT,
 store_name varchar(100),
 PRIMARY KEY (`store_id`) 
);

CREATE TABLE dim_goods(
 goods_id BIGINT,
 goods_name varchar(100),
 PRIMARY KEY (`goods_id`)
);

CREATE TABLE dwa_mysql_order_analysis (
	store_id BIGINT,
	store_name varchar(100),
	sales_goods_distinct_nums bigint,
	sales_amt double,
	order_nums bigint,
	PRIMARY KEY (store_id,store_name)
);

3. flinksql建表

3.1 进入flinksql客户端 

sql-client.sh embedded

flink 连接mysql,flinksql,mysql,flink,kafka​3.2 配置输出格式

SET sql-client.execution.result-mode=tableau;

flink 连接mysql,flinksql,mysql,flink,kafka
3.3 flink建表

--mysql中的 订单主表
CREATE TABLE src_mysql_order(
 order_id BIGINT,
 store_id BIGINT,
 sales_amt double,
 PRIMARY KEY (`order_id`) NOT ENFORCED
) WITH (
 'connector' = 'mysql-cdc',
 'hostname' = 'hadoop002',
 'port' = '3306',
 'username' = 'root',
 'password' = 'root',
 'database-name' = 'test',
 'table-name' = 'src_mysql_order',
 'scan.incremental.snapshot.enabled' = 'false'
);

--mysql中的 订单明细表
CREATE TABLE src_mysql_order_detail(
 order_id BIGINT,
 store_id BIGINT,
 goods_id BIGINT,
 sales_amt double,
 PRIMARY KEY (order_id,store_id,goods_id) NOT ENFORCED
) WITH (
 'connector' = 'mysql-cdc',
 'hostname' = 'hadoop002',
 'port' = '3306',
 'username' = 'root',
 'password' = 'root',
 'database-name' = 'test',
 'table-name' = 'src_mysql_order_detail',
 'scan.incremental.snapshot.enabled' = 'false'
);

--mysql中的 商店维表
CREATE TABLE dim_store(
 store_id BIGINT,
 store_name varchar(100),
 PRIMARY KEY (`store_id`) NOT ENFORCED
) WITH (
 'connector' = 'mysql-cdc',
 'hostname' = 'hadoop002',
 'port' = '3306',
 'username' = 'root',
 'password' = 'root',
 'database-name' = 'test',
 'table-name' = 'dim_store',
 'scan.incremental.snapshot.enabled' = 'false'
);

--mysql中的 商品维表
CREATE TABLE dim_goods(
 goods_id BIGINT,
 goods_name varchar(100),
 PRIMARY KEY (`goods_id`) NOT ENFORCED
) WITH (
 'connector' = 'mysql-cdc',
 'hostname' = 'hadoop002',
 'port' = '3306',
 'username' = 'root',
 'password' = 'root',
 'database-name' = 'test',
 'table-name' = 'dim_goods',
 'scan.incremental.snapshot.enabled' = 'false'
);

--kafka中的 ods层 订单表
CREATE TABLE ods_kafka_order (
 order_id BIGINT,
 store_id BIGINT,
 sales_amt double,
 PRIMARY KEY (`order_id`) NOT ENFORCED
) WITH (
 'connector' = 'upsert-kafka',
 'topic' = 'ods_kafka_order',
 'properties.bootstrap.servers' = 'hadoop001:9092',
 'properties.group.id' = 'ods_group1',
  'key.format' = 'json',
 'value.format' = 'json'
);

----kafka中的 ods层 订单明细表
CREATE TABLE ods_kafka_order_detail (
 order_id BIGINT,
 store_id BIGINT,
 goods_id BIGINT,
 sales_amt double,
 PRIMARY KEY (order_id,store_id,goods_id) NOT ENFORCED
) WITH (
 'connector' = 'upsert-kafka',
 'topic' = 'ods_kafka_order_detail',
 'properties.bootstrap.servers' = 'hadoop001:9092',
 'properties.group.id' = 'ods_group1',
  'key.format' = 'json',
 'value.format' = 'json'
);

--kafka中的 dwd层 订单表
CREATE TABLE dwd_kafka_order (
 order_id BIGINT,
 store_id BIGINT,
 sales_amt double,
 PRIMARY KEY (`order_id`) NOT ENFORCED
) WITH (
 'connector' = 'upsert-kafka',
 'topic' = 'dwd_kafka_order',
 'properties.bootstrap.servers' = 'hadoop001:9092',
 'properties.group.id' = 'dwd_group1',
  'key.format' = 'json',
 'value.format' = 'json'
);

--kafka中的 dwd层 订单明细表
CREATE TABLE dwd_kafka_order_detail (
 order_id BIGINT,
 store_id BIGINT,
 goods_id BIGINT,
 sales_amt double,
 PRIMARY KEY (order_id,store_id,goods_id) NOT ENFORCED
) WITH (
 'connector' = 'upsert-kafka',
 'topic' = 'dwd_kafka_order_detail',
 'properties.bootstrap.servers' = 'hadoop001:9092',
 'properties.group.id' = 'dwd_group1',
  'key.format' = 'json',
 'value.format' = 'json'
);

--mysql中的dwa 订单指标统计
CREATE TABLE dwa_mysql_order_analysis (
	store_id BIGINT,
	store_name varchar(100),
	sales_goods_distinct_nums bigint,
	sales_amt double,
	order_nums bigint,
	PRIMARY KEY (store_id,store_name) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://hadoop002:3306/test',
   'table-name' = 'dwa_mysql_order_analysis',
    'driver' = 'com.mysql.cj.jdbc.Driver',
    'username' = 'root',
    'password' = 'root',
	'sink.buffer-flush.max-rows' = '10'
);

3.4 任务流配置

--任务流配置
insert into ods_kafka_order select * from src_mysql_order;
insert into ods_kafka_order_detail select * from src_mysql_order_detail;
insert into dwd_kafka_order select * from ods_kafka_order;
insert into dwd_kafka_order_detail select * from ods_kafka_order_detail;

insert into dwa_mysql_order_analysis
select 
	orde.store_id as store_id
	,store.store_name as store_name
	,count(distinct order_detail.goods_id) as sales_goods_distinct_nums
	,sum(order_detail.sales_amt) as sales_amt
	,count(distinct orde.order_id) as order_nums
from dwd_kafka_order 		as orde
join dwd_kafka_order_detail	as order_detail
on 	 orde.order_id = order_detail.order_id
join dim_store 				as store 
on 	 orde.store_id = store.store_id 
group by 
	orde.store_id
	,store.store_name 
;

查看flink管理界面,可以看到有5个正在运行的任务,实时流就配置好了

flink 连接mysql,flinksql,mysql,flink,kafka

4. 测试

4.1 插入测试数据

insert into src_mysql_order values 
(20221210001,10000,50),
(20221210002,10000,20),
(20221210003,10001,10);

insert into src_mysql_order_detail values 
(20221210001,10000,100000,30),
(20221210001,10000,100001,20),
(20221210002,10000,100001,20),
(20221210003,10001,100000,10);

insert into dim_store values 
(10000, '宇唐总店'),
(10001, '宇唐一店'),
(10002, '宇唐二店'),
(10003, '宇唐三店');

insert into dim_goods values 
(100000, '天狮达特浓缩枣浆'),
(100001, '蜜炼柚子茶');

4.2 查看结果表数据
flink 连接mysql,flinksql,mysql,flink,kafka

4.3 新增测试数据

insert into src_mysql_order values  
(20221210004,10002,50), 
(20221210005,10003,30);

insert into src_mysql_order_detail values 
(20221210004,10002,100000,30),
(20221210004,10002,100001,20),
(20221210005,10003,100000,10),
(20221210005,10003,100001,20);

 4.4 再次查看结果表数据

flink 连接mysql,flinksql,mysql,flink,kafka文章来源地址https://www.toymoban.com/news/detail-674164.html

到了这里,关于使用flink的sql-client.sh,测试mysql-->kafka-->kafka-->mysql实时流的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink SQL和Table API实现消费kafka写入mysql

    1、构建 table环境 2、构建source kafka 方式一:API 方式二:Flink SQL 3、构建sink mysql  4、写入将source表写入sink表 方式一:API 方式二:Flink SQL 5、手动执行 6、测试 (1)连接kafka生产者 (2)造数据 (3)mysql查看入库情况

    2024年01月16日
    浏览(31)
  • kerberos认证Flink的kafka connector和kafka client配置

    1. kafka配置文件 kafka jaas必须配置,如果缺少,则报一下错误。 对于Flink只能通过配置 java.security.auth.login.config 的方式。 jaas配置 1.1 方式一: System.setProperty配置系统变量: kafka_client_jaas_keytab.conf文件内容如下: 1.2 方法二:在IDEA中添加jvm参数: 注意:将参数添加至kafka 的pr

    2024年02月04日
    浏览(30)
  • 20、Flink SQL之SQL Client: 不用编写代码就可以尝试 Flink SQL,可以直接提交 SQL 任务到集群上

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月11日
    浏览(33)
  • 【flink番外篇】21、Flink 通过SQL client 和 table api注册catalog示例

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月21日
    浏览(56)
  • 30、Flink SQL之SQL 客户端(通过kafka和filesystem的例子介绍了配置文件使用-表、视图等)

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月14日
    浏览(52)
  • 使用Flink实现Kafka到MySQL的数据流转换:一个基于Flink的实践指南

    在现代数据处理架构中,Kafka和MySQL是两种非常流行的技术。Kafka作为一个高吞吐量的分布式消息系统,常用于构建实时数据流管道。而MySQL则是广泛使用的关系型数据库,适用于存储和查询数据。在某些场景下,我们需要将Kafka中的数据实时地写入到MySQL数据库中,本文将介绍

    2024年04月15日
    浏览(38)
  • 基于Flink CDC实时同步PostgreSQL与Tidb【Flink SQL Client模式下亲测可行,详细教程】

    操作系统:ubuntu-22.04,运行于wsl 2【 注意,请务必使用wsl 2 ;wsl 1会出现各种各样的问题】 软件版本:PostgreSQL 14.9,TiDB v7.3.0,flink 1.7.1,flink cdc 2.4.0 已有postgre的跳过此步 (1)pg安装 https://zhuanlan.zhihu.com/p/143156636 (2)pg配置 可能出现的问题 sudo -u postgres psql 报错: psql: err

    2024年02月11日
    浏览(27)
  • 使用Flink MySQL cdc分别sink到ES、Kafka、Hudi

    [flink-1.13.1-bin-scala_2.11.tgz](https://archive.apache.org/dist/flink/flink-1.13.1/flink-1.13.1-bin-scala_2.11.tgz) [hadoop-2.7.3.tar.gz](https://archive.apache.org/dist/hadoop/common/hadoop-2.7.3/hadoop-2.7.3.tar.gz) [flink-cdc-connectors](https://github.com/ververica/flink-cdc-connectors)(git clone源码编译) [hudi](https://github.com/apache/hudi)(git

    2024年02月03日
    浏览(38)
  • 【flink sql】kafka连接器

    Kafka 连接器提供从 Kafka topic 中消费和写入数据的能力。 前面已经介绍了flink sql创建表的语法及说明:【flink sql】创建表 这篇博客聊聊怎么通过flink sql连接kafka 以下的连接器元数据可以在表定义中通过元数据列的形式获取。 R/W 列定义了一个元数据是可读的(R)还是可写的(

    2024年02月08日
    浏览(35)
  • Flink Upsert Kafka SQL Connector 介绍

    在某些场景中,比方GROUP BY聚合之后的后果,须要去更新之前的结果值。这个时候,须要将 Kafka 记录的 key 当成主键解决,用来确定一条数据是应该作为插入、删除还是更新记录来解决。在 Flink1.11 中,能够通过 flink-cdc-connectors 项目提供的 changelog-json format 来实现该性能。 在

    2024年02月20日
    浏览(33)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包