Flink SQL和Table API实现消费kafka写入mysql

这篇具有很好参考价值的文章主要介绍了Flink SQL和Table API实现消费kafka写入mysql。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Flink SQL和Table API实现消费kafka写入mysql

1、构建table环境

// 创建flink流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// table环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

2、构建source kafka

方式一:API

// Kafka连接器
Kafka kafka = new Kafka()
        .version("0.10") // 指定Kafka的版本,可选参数包含"0.8", "0.9", "0.10", "0.11", 和 "universal","universal"为通用版本
        .property("zookeeper.connect", "172.18.194.90:2181,172.18.194.91:2181,172.18.194.92:2181") // 指定Kafka集群关联的zookeeper集群的地址
        .property("bootstrap.servers", "172.18.194.90:9092,172.18.194.91:9092,172.18.194.92:9092") // 指定Kafka broker的地址
        .property("group.id", "testGroup") // 指定Kafka消费者的group id
        .topic("test_huyj"); // 指定消费的topic的名字



// 指定数据结构
Schema schema = new Schema()
        .field("opt","string")
        .field("username", "string")
        .field("password", "string");



tableEnv
        .connect(kafka)//定义表的数据来源,和外部系统建立连接
        .withFormat(new Json().failOnMissingField(false)) //定义数据格式化方法
        .withSchema(schema) //定义表结构
        .createTemporaryTable("user_table"); //创建临时表

Table user_table = tableEnv.from("user_table");//读临时表
user_table.printSchema();//打印表结构
DataStream<User> dataStream = tableEnv.toAppendStream(user_table, User.class);//转成流
dataStream.print();

方式二:Flink SQL

tableEnv.sqlUpdate( "CREATE TABLE user_table (\n" +
                    "    opt string,\n" +
                    "    username string,\n" +
                    "    password string\n" +
                    ") WITH (\n" +
                    "    'connector.type' = 'kafka', \n" + //-- 使用 kafka connector
                    "    'connector.version' = '0.10', \n" +//-- kafka 版本,universal 支持 0.11 以上的版本 -- 写universal报错,改成0.10
                    "    'connector.topic' = 'test_huyj', \n" +//-- kafka topic
                    "    'connector.startup-mode' = 'earliest-offset',\n" +
                    "    'connector.properties.0.key' = 'zookeeper.connect', \n" +//-- 连接信息
                    "    'connector.properties.0.value' = '172.18.194.90:2181,172.18.194.91:2181,172.18.194.92:2181', \n" +
                    "    'connector.properties.1.key' = 'bootstrap.servers',\n" +
                    "    'connector.properties.1.value' = '172.18.194.90:9092,172.18.194.91:9092,172.18.194.92:9092', \n" +
                    "    'update-mode' = 'append',\n" +
                    "    'format.type' = 'json', \n" +//-- 数据源格式为 json
                    "    'format.derive-schema' = 'true' \n" +//-- 从 DDL schema 确定 json 解析规则
                    ")\n");

3、构建sink mysql 

//构建sink方式1:直接执行flink sql构建sink mysql
tableEnv.sqlUpdate("CREATE TABLE mysqlOutput(\n"+
        "opt string,\n" +
        "username string,\n" +
        "password string\n" +
        ") WITH (\n" +
        "'connector.type'='jdbc',\n" +
        "'connector.url'='jdbc:mysql://172.18.194.91:13306/huyj?useUnicode=true&characterEncoding=utf-8',\n" +
        "'connector.table'='tb_user',\n" +
        "'connector.driver'='com.mysql.jdbc.Driver',\n" +
        "'connector.username'='root',\n" +
        "'connector.password'='root'" +
        ")");

4、写入将source表写入sink表

方式一:API

user_table.insertInto("mysqlOutput");

方式二:Flink SQL

tableEnv.sqlUpdate("INSERT INTO mysqlOutput\n" +
        "SELECT opt,username,password\n" +
        "FROM user_table");

5、手动执行

env.execute("kafka2mysql");

6、测试

(1)连接kafka生产者

cd  /realtime/kafkacluster/kafka_2.11-1.1.1/bin

sh kafka-console-producer.sh --broker-list 172.18.194.90:9092,172.18.194.91:9092,172.18.194.92:9092 --topic test_huyj

(2)造数据

{"opt":"1","username":"huyj","password":"111111"}

{"opt":"2","username":"huyj","password":"222222"}

{"opt":"3","username":"huyj","password":"333333"}

(3)mysql查看入库情况

flink sql 消费 kafka,java,kafka,开发语言文章来源地址https://www.toymoban.com/news/detail-793496.html

到了这里,关于Flink SQL和Table API实现消费kafka写入mysql的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 24、Flink 的table api与sql之Catalogs(介绍、类型、java api和sql实现ddl、java api和sql操作catalog)-1

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月06日
    浏览(56)
  • 【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年01月17日
    浏览(60)
  • Flink Table API 与 SQL 编程整理

    Flink API 总共分为 4 层这里主要整理 Table API 的使用 Table API 是流处理和批处理通用的关系型 API , Table API 可以基于流输入或者批输入来运行而不需要进行任何修改。 Table API 是 SQL 语言的超集并专门为 Apache Flink 设计的, Table API 是 Scala 和 Java 语言集成式的 API 。与常规 SQL 语言

    2024年02月04日
    浏览(54)
  • 实战Flink Java api消费kafka实时数据落盘HDFS

    在Java api中,使用flink本地模式,消费kafka主题,并直接将数据存入hdfs中。 flink版本1.13 kafka版本0.8 hadoop版本3.1.4 为了完成 Flink 从 Kafka 消费数据并实时写入 HDFS 的需求,通常需要启动以下组件: 确保 Zookeeper 在运行,因为 Flink 的 Kafka Consumer 需要依赖 Zookeeper。 确保 Kafka Serve

    2024年01月24日
    浏览(50)
  • Flink Table API/SQL 多分支sink

    在某个场景中,需要从Kafka中获取数据,经过转换处理后,需要同时sink到多个输出源中(kafka、mysql、hologres)等。两次调用execute, 阿里云Flink vvr引擎报错: 使用 StreamStatementSet. 具体参考官网: https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/dev/table/data_stream_api/#converting-betwe

    2024年02月11日
    浏览(100)
  • 【Flink SQL】Flink SQL 基础概念(一):SQL & Table 运行环境、基本概念及常用 API

    《 Flink SQL 基础概念 》系列,共包含以下 5 篇文章: Flink SQL 基础概念(一):SQL Table 运行环境、基本概念及常用 API Flink SQL 基础概念(二):数据类型 Flink SQL 基础概念(三):SQL 动态表 连续查询 Flink SQL 基础概念(四):SQL 的时间属性 Flink SQL 基础概念(五):SQL 时区问

    2024年03月21日
    浏览(79)
  • 《十堂课学习 Flink》第五章:Table API 以及 Flink SQL 入门

    第四章中介绍了 DataStream API 以及 DataSet API 的入门案例,本章开始介绍 Table API 以及基于此的高层应用 Flink SQL 的基础。 Flink 提供了两个关系API——Table API 和 SQL——用于统一的流和批处理。Table API 是一种针对Java、Scala和Python的语言集成查询API,它允许以非常直观的方式组合来

    2024年02月03日
    浏览(71)
  • Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】

           今天一天争取搞完最后这一部分,学完赶紧把 Kafka 和 Flume 学完,就要开始做实时数仓了。据说是应届生得把实时数仓搞个 80%~90% 才能差不多找个工作,太牛马了。         之前我们已经用过了一些简单的内置连接器,比如 \\\'datagen\\\' 、\\\'print\\\' ,其它的可以查看官网:

    2024年01月24日
    浏览(56)
  • Flink系列Table API和SQL之:时间属性

    基于时间的操作(比如时间窗口),需要定义相关的时间语义和时间数据来源的信息。在Table API和SQL中,会给表单独提供一个逻辑上的时间字段,专门用来在表处理程序中指示时间。 所谓的时间属性(time attributes),就是每个表模式结构(schema)的一部分。可以在创建表的DDL里直接定

    2023年04月09日
    浏览(46)
  • Flink(十三)Flink 的table api与sql的基本概念、通用api介绍及入门示例

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月15日
    浏览(57)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包