FlinkSQL kafka完整案例 可直接复制使用

这篇具有很好参考价值的文章主要介绍了FlinkSQL kafka完整案例 可直接复制使用。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

为自己记录一下flinksql 消费kafka json数据 并写入doris的完整案例
用完发现,flinksql 是真的香。

虽然尽量追求完整,但是从kafka造数据开始写,过于累赘因此省略。正文开始。

单表

kafka原始数据

{"id":1,"name":"nick","age":7,"address":"shanghai"}

原始数据形式

flinksql 连接

准备连接sql

    public static String kafkaTablePerson = "CREATE TABLE person (\n" +
            " id INT,\n" +
            " name STRING,\n" +
            " age INT,\n" +
            " address STRING\n" +
            ") WITH (\n" +
            " 'connector' = 'kafka',\n" +
            " 'topic' = 'flink',\n" +
            " 'properties.bootstrap.servers' = '172.8.10.xxx:6667',\n" +
            " 'format' = 'json',\n" +
            " 'scan.startup.mode' = 'earliest-offset'\n" +
            ")\n";

以下的连接器元数据可以在表定义中通过元数据列的形式获取。

R/W 列定义了一个元数据是可读的(R)还是可写的(W)。 只读列必须声明为 VIRTUAL 以在 INSERT INTO 操作中排除它们。

数据类型 描述 R/W
topic STRING NOT NULL Kafka 记录的 Topic 名。 R
partition INT NOT NULL Kafka 记录的 partition ID。 R
headers MAP NOT NULL 二进制 Map 类型的 Kafka 记录头(Header)。 R/W
leader-epoch INT NULL Kafka 记录的 Leader epoch(如果可用)。 R
offset BIGINT NOT NULL Kafka 记录在 partition 中的 offset。 R
timestamp TIMESTAMP_LTZ(3) NOT NULL Kafka 记录的时间戳。 R/W
timestamp-type STRING NOT NULL Kafka 记录的时间戳类型。可能的类型有 “NoTimestampType”, “CreateTime”(会在写入元数据时设置),或 “LogAppendTime”。 R

以下是kafka connector的参数,只写了常用的,文末有所有参数

连接器参数
参数 是否必选 默认值 数据类型 描述
connector 必选 (无) String 指定使用的连接器,Kafka 连接器使用 ‘kafka’
topic required for sink (无) String 当表用作 source 时读取数据的 topic 名。亦支持用分号间隔的 topic 列表,如 ‘topic-1;topic-2’。注意,对 source 表而言,’topic’ 和 ‘topic-pattern’ 两个选项只能使用其中一个。当表被用作 sink 时,该配置表示写入的 topic 名。注意 sink 表不支持 topic 列表。
properties.bootstrap.servers 必选 (无) String 逗号分隔的 Kafka broker 列表。
properties.group.id 对 source 可选,不适用于 sink (无) String Kafka source 的消费组 id。如果未指定消费组 ID,则会使用自动生成的 “KafkaSource-{tableIdentifier}” 作为消费组 ID。
format 必选 (无) String 用来序列化或反序列化 Kafka 消息的格式。 请参阅 格式 页面以获取更多关于格式的细节和相关配置项。 注意:该配置项和 ‘value.format’ 二者必需其一。
scan.startup.mode 可选 group-offsets String Kafka consumer 的启动模式。有效值为:‘earliest-offset’‘latest-offset’‘group-offsets’‘timestamp’‘specific-offsets’

flinksql查询

    public static String selectALLPerson = "select id,name,age,address from person ";

主体方法

  		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
        final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        
        tEnv.executeSql(CreateTableSQL.kafkaTableInfo);
    	tEnv.executeSql(SelectSQL.selectALLPerson).print();

简单的几行代码就可以搞定了,和写sql 没什么区别,十分便捷。

sql筛选语句

把sql稍微复杂一点点

public static String selectPerCountNamePrint = " select name, COUNT(*) as num FROM person GROUP BY name ";

依旧可以,不过有个坑,别名不能和方法名相同,这个在其他地方似乎没有这个限制的,我试了count(*) as count 就会出现错误。

数据落盘

仅仅只是select 是不会满足使用的,需要落盘。
我这里使用的是sink doris中

    public static final String SinkDorisPerCountName = "CREATE TABLE  percountname_sink (" +
            "name STRING, \n" +
            "num BIGINT  \n" +
            ")"+
            "WITH ( \n" +
            " 'connector' = 'doris' , \n" +
            " 'fenodes' = '172.8.10.xxx:8030' , \n" +
            " 'table.identifier' = 'test_db.PerCountName' ,\n " +
            " 'username' = 'username', \n " +
            " 'password' = 'password' ,\n " +
            "  'sink.label-prefix' ='" + label + "',\n" +
            "  'sink.properties.format' = 'json',\n" +
            "  'sink.properties.read_json_by_line' = 'true'\n" +
            ")";

需要注意的是label 必须是唯一的,为了避免多次测试时遇到问题,这里写了个随机的label,确保每次都不一样。

案例中的WITH 参数基本满足使用,更多细节推荐去官网看看。

为了方便我把doris建表贴出来

CREATE TABLE IF NOT EXISTS PerCountName
(
    `name` VARCHAR(50) NOT NULL COMMENT "",
    `num` INT NOT NULL COMMENT "出现的次数"
)
UNIQUE KEY(`name`)
DISTRIBUTED BY HASH(`name`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);

insertSQL语句

public static String selectPerCountNamePrint = " select name, COUNT(*) as num FROM person GROUP BY name ";
执行

现在只需要把上文的主体方法稍微改一下即可

 		tEnv.executeSql(CreateTableSQL.kafkaTablePerson);
        tEnv.executeSql(SinkDorisTableSQL.SinkDorisPerCountName);
        tEnv.executeSql(SelectSQL.selectPerCountNameInsert);

执行的时候会发现doris里并没有数据,这是因为
为了保证Flink的Exactly Once语义,Flink Doris Connector 默认开启两阶段提交,Doris在1.1版本后默认开启两阶段提交。1.0可通过修改BE参数开启。
因此需要开启checkpoint 或者修改doris BE参数

 env.enableCheckpointing(10000);
 env.setParallelism(1);

加上之后即可

还去试了一下on yarn 是没有问题的,不过会遇到报错
可以看这篇文章
https://blog.csdn.net/weixin_45399602/article/details/127526911

好了,现在单表查询和落盘已经会了,想试一下多表查询有没有问题呢

多表join

新表结构这样

{"id":0,"name":"jack","score":141,"date":"2023-01-02"}

目标为:每个name的最大score。

简单一点 直接贴了

    public static String kafkaTableInfo = "CREATE TABLE info (\n" +
            " id INT,\n" +
            " name STRING,\n" +
            " score INT,\n" +
            " `date` DATE\n" +
            ") WITH (\n" +
            " 'connector' = 'kafka',\n" +
            " 'topic' = 'flinkinfo',\n" +
            " 'properties.bootstrap.servers' = '172.8.10.xxx:6667',\n" +
            " 'format' = 'json',\n" +
            " 'scan.startup.mode' = 'earliest-offset'\n" +
            ")\n";

 public static String selectPerMaxScore = " SELECT info.name, MAX(info.score) AS max_score FROM info JOIN person ON person.name = info.name GROUP BY info.name";

join查询

tEnv.executeSql(CreateTableSQL.kafkaTableInfo);
tEnv.executeSql(CreateTableSQL.kafkaTablePerson);
tEnv.executeSql(SelectSQL.selectPerMaxScore).print();

或者写到doris里 也是可以的,效果会比较明显。

总结

flinksql 消费kafka整体来说是十分简单好用的,使用时门口较低,实时性又高,可以经常使用。

OK,以上为案例所以内容。

以下是补充内容。


整个项目所需要的所有依赖

可以跳过文章来源地址https://www.toymoban.com/news/detail-630004.html

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <scala.version>2.12</scala.version>
    <java.version>1.8</java.version>
    <flink.version>1.14.4</flink.version>
    <fastjson.version>1.2.62</fastjson.version>
    <hadoop.version>2.8.3</hadoop.version>
    <scope.mode>compile</scope.mode>
    <slf4j.version>1.7.30</slf4j.version>

  </properties>

  <dependencies>
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-table-api-scala-bridge_${scala.version}</artifactId>
          <version>${flink.version}</version>
          <scope>provided</scope>
      </dependency>
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-table-planner_${scala.version}</artifactId>
          <version>${flink.version}</version>
          <!--            本地测试时注释 scope -->
<!--          <scope>provided</scope>-->
      </dependency>
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-streaming-scala_${scala.version}</artifactId>
          <version>${flink.version}</version>
          <!--            本地测试时注释 scope -->
<!--          <scope>provided</scope>-->
      </dependency>
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-clients_${scala.version}</artifactId>
          <version>${flink.version}</version>
      </dependency>
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-jdbc_${scala.version}</artifactId>
          <version>${flink.version}</version>
      </dependency>
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-kafka_${scala.version}</artifactId>
          <version>${flink.version}</version>
      </dependency>
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-json</artifactId>
          <version>${flink.version}</version>
      </dependency>
      <dependency>
          <groupId>com.alibaba</groupId>
          <artifactId>fastjson</artifactId>
          <version>${fastjson.version}</version>
      </dependency>
      <!-- Add log dependencies when debugging locally -->
      <dependency>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-api</artifactId>
          <version>${slf4j.version}</version>
      </dependency>
      <dependency>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-log4j12</artifactId>
          <version>${slf4j.version}</version>
      </dependency>
      <!-- flink-doris-connector -->
      <dependency>
          <groupId>org.apache.doris</groupId>
          <artifactId>flink-doris-connector-1.14_2.12</artifactId>
          <version>1.1.0</version>
      </dependency>
      <dependency>
          <groupId>mysql</groupId>
          <artifactId>mysql-connector-java</artifactId>
          <version>8.0.12</version>
      </dependency>
      <dependency>
          <groupId>com.ververica</groupId>
          <artifactId>flink-connector-mysql-cdc</artifactId>
          <version>2.2.0</version>
          <exclusions>
              <exclusion>
                  <artifactId>flink-shaded-guava</artifactId>
                  <groupId>org.apache.flink</groupId>
              </exclusion>
          </exclusions>
      </dependency>
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-runtime-web_${scala.version}</artifactId>
          <version>${flink.version}</version>
      </dependency>

      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
          <version>${flink.version}</version>
      </dependency>
      <!--保存检查点到hdfs上-->
      <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-client</artifactId>
          <version>${hadoop.version}</version>
      </dependency>

  </dependencies>

kafka connector参数

参数 是否必选 默认值 数据类型 描述
connector 必选 (无) String 指定使用的连接器,Kafka 连接器使用 ‘kafka’
topic required for sink (无) String 当表用作 source 时读取数据的 topic 名。亦支持用分号间隔的 topic 列表,如 ‘topic-1;topic-2’。注意,对 source 表而言,’topic’ 和 ‘topic-pattern’ 两个选项只能使用其中一个。当表被用作 sink 时,该配置表示写入的 topic 名。注意 sink 表不支持 topic 列表。
topic-pattern 可选 (无) String 匹配读取 topic 名称的正则表达式。在作业开始运行时,所有匹配该正则表达式的 topic 都将被 Kafka consumer 订阅。注意,对 source 表而言,’topic’ 和 ‘topic-pattern’ 两个选项只能使用其中一个。
properties.bootstrap.servers 必选 (无) String 逗号分隔的 Kafka broker 列表。
properties.group.id 对 source 可选,不适用于 sink (无) String Kafka source 的消费组 id。如果未指定消费组 ID,则会使用自动生成的 “KafkaSource-{tableIdentifier}” 作为消费组 ID。
properties.* 可选 (无) String 可以设置和传递任意 Kafka 的配置项。后缀名必须匹配在 Kafka 配置文档 中定义的配置键。Flink 将移除 “properties.” 配置键前缀并将变换后的配置键和值传入底层的 Kafka 客户端。例如,你可以通过 ‘properties.allow.auto.create.topics’ = ‘false’ 来禁用 topic 的自动创建。但是某些配置项不支持进行配置,因为 Flink 会覆盖这些配置,例如 ‘key.deserializer’‘value.deserializer’
format 必选 (无) String 用来序列化或反序列化 Kafka 消息的格式。 请参阅 格式 页面以获取更多关于格式的细节和相关配置项。 注意:该配置项和 ‘value.format’ 二者必需其一。
key.format 可选 (无) String 用来序列化和反序列化 Kafka 消息键(Key)的格式。 请参阅 格式 页面以获取更多关于格式的细节和相关配置项。 注意:如果定义了键格式,则配置项 ‘key.fields’ 也是必需的。 否则 Kafka 记录将使用空值作为键。
key.fields 可选 [] List 表结构中用来配置消息键(Key)格式数据类型的字段列表。默认情况下该列表为空,因此消息键没有定义。 列表格式为 ‘field1;field2’
key.fields-prefix 可选 (无) String 为所有消息键(Key)格式字段指定自定义前缀,以避免与消息体(Value)格式字段重名。默认情况下前缀为空。 如果定义了前缀,表结构和配置项 ‘key.fields’ 都需要使用带前缀的名称。 当构建消息键格式字段时,前缀会被移除,消息键格式将会使用无前缀的名称。 请注意该配置项要求必须将 ‘value.fields-include’ 配置为 ‘EXCEPT_KEY’
value.format 必选 (无) String 序列化和反序列化 Kafka 消息体时使用的格式。 请参阅 格式 页面以获取更多关于格式的细节和相关配置项。 注意:该配置项和 ‘format’ 二者必需其一。
value.fields-include 可选 ALL 枚举类型可选值:[ALL, EXCEPT_KEY] 定义消息体(Value)格式如何处理消息键(Key)字段的策略。 默认情况下,表结构中 ‘ALL’ 即所有的字段都会包含在消息体格式中,即消息键字段在消息键和消息体格式中都会出现。
scan.startup.mode 可选 group-offsets String Kafka consumer 的启动模式。有效值为:‘earliest-offset’‘latest-offset’‘group-offsets’‘timestamp’‘specific-offsets’。 请参阅下方 起始消费位点 以获取更多细节。
scan.startup.specific-offsets 可选 (无) String 在使用 ‘specific-offsets’ 启动模式时为每个 partition 指定 offset,例如 ‘partition:0,offset:42;partition:1,offset:300’
scan.startup.timestamp-millis 可选 (无) Long 在使用 ‘timestamp’ 启动模式时指定启动的时间戳(单位毫秒)。
scan.topic-partition-discovery.interval 可选 (无) Duration Consumer 定期探测动态创建的 Kafka topic 和 partition 的时间间隔。
sink.partitioner 可选 ‘default’ String Flink partition 到 Kafka partition 的分区映射关系,可选值有:default:使用 Kafka 默认的分区器对消息进行分区。fixed:每个 Flink partition 最终对应最多一个 Kafka partition。round-robin:Flink partition 按轮循(round-robin)的模式对应到 Kafka partition。只有当未指定消息的消息键时生效。自定义 FlinkKafkaPartitioner 的子类:例如 ‘org.mycompany.MyPartitioner’。请参阅下方 Sink 分区 以获取更多细节。
sink.semantic 可选 at-least-once String 定义 Kafka sink 的语义。有效值为 ‘at-least-once’‘exactly-once’‘none’。请参阅 一致性保证 以获取更多细节。
sink.parallelism 可选 (无) Integer 定义 Kafka sink 算子的并行度。默认情况下,并行度由框架定义为与上游串联的算子相同。

到了这里,关于FlinkSQL kafka完整案例 可直接复制使用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • FlinkSql写入/读取Kafka

    创建写入kafka的sink表 创建catalog 插入数据 发现kafka中已有数据 创建连接Kafka的Source表 创建iceberg表 3.插入数据 问题: 报错如下:org.apache.flink.util.FlinkRuntimeException: Failed to send data to Kafka null with FlinkKafkaInternalProducer{transactionalId=‘null’, inTransaction=false, closed=false} Caused by: org.ap

    2024年02月15日
    浏览(41)
  • Flink实战-(6)FlinkSQL实现CDC

    FlinkSQL说明 Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。 自 2015 年开始,阿里巴巴开始调研开源流计算引擎,最终决定基于 Flink 打造新一代计算引擎,针对 Flink 存在的不足进行优化和改进,并且在 2019 年初

    2023年04月26日
    浏览(44)
  • flink学习35:flinkSQL查询mysql

    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.EnvironmentSettings import org.apache.flink.table.api.bridge.scala.{StreamTableEnvironment, tableConversions} object sqlQueryTable {   def main(args: Array[String]): Unit = {     //create env     val env = StreamExecutionEnvironment.getExecutionEnv

    2023年04月23日
    浏览(38)
  • 【Flink系列七】TableAPI和FlinkSQL初体验

    Apache Flink 有两种关系型 API 来做流批统一处理:Table API 和 SQL Table API 是用于 Scala 和 Java 语言的查询API,它可以用一种非常直观的方式来组合使用选取、过滤、join 等关系型算子。  Flink SQL 是基于 Apache Calcite 来实现的标准 SQL。无论输入是连续的(流式)还是有界的(批处理

    2024年02月03日
    浏览(29)
  • flinksql kafka到mysql累计指标练习

    数据流向:kafka -kafka -mysql 模拟写数据到kafka topic:wxt中 kafka topic :wxt1 kafka topic :wxt2 mysql结果数据: pom文件

    2024年02月08日
    浏览(22)
  • 【Flink】FlinkSQL中执行计划以及如何用代码看执行计划

    FilnkSQL怎么查询优化 Apache Flink 使用并扩展了 Apache Calcite 来执行复杂的查询优化。 这包括一系列基于规则和成本的优化,例如: • 基于 Apache Calcite 的子查询解相关 • 投影剪裁 • 分区剪裁 • 过滤器下推 • 子计划消除重复数据以避免重复计算 • 特殊子查询重写,包括两部

    2023年04月11日
    浏览(43)
  • 2.1、如何在FlinkSQL中读取&写入到Kafka

    目录 1、环境设置 方式1:在Maven工程中添加pom依赖 方式2:在 sql-client.sh 中添加 jar包依赖 2、读取Kafka 2.1 创建 kafka表 2.2 读取 kafka消息体(Value) 使用 \\\'format\\\' = \\\'json\\\' 解析json格式的消息 使用 \\\'format\\\' = \\\'csv\\\' 解析csv格式的消息 使用 \\\'format\\\' = \\\'raw\\\' 解析kafka消息为单个字符串字段

    2024年02月08日
    浏览(43)
  • 【Flink】FlinkSQL读取Mysql表中时间字段相差13个小时

    问题:Flink版本1.13,在我们使用FlinkSQL读取Mysql中数据的时候,发现读取出来的时间字段中的数据和Mysql表中的数据相差13个小时,Mysql建表语句及插入的数据如下; CREATE TABLE `mysql_example` ( `id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT \\\'自增ID\\\', `name` varchar(64) DEFAULT NULL COMMENT \\\'姓名\\\'

    2024年01月19日
    浏览(25)
  • FlinkSQL-- sql-client及源码解析 -- flink-1.13.6

    本文基于flink-1.13.6 SQL Client: Init scripts and Statement Sets 这个版本极大地改进了 SQL 客户端的功能。现在 SQL Client 和 SQL 脚本都支持 通过Java 应用程序执行的几乎所有操作(从 TableEnvironment 以编程方式启动查询)。这意味着 SQL 用户在 SQL 部署中需要的代码少了很多。其中最核心的功能

    2023年04月27日
    浏览(31)
  • 2.3 如何使用FlinkSQL读取&写入到JDBC(MySQL)

    FlinkSQL允许使用 JDBC连接器,向任意类型的关系型数据库读取或者写入数据 添加Maven依赖 注意:如果使用 sql-client客户端,需保证 flink-1.17.1/lib 目录下 存在相应的jar包  相关jar可以通过官网下载:JDBC SQL 连接器  FlinkSQL读取MySQL表时,为批式处理,在流式计算任务中,通常被

    2024年02月06日
    浏览(35)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包