Flink Upsert Kafka SQL Connector 介绍

这篇具有很好参考价值的文章主要介绍了Flink Upsert Kafka SQL Connector 介绍。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一 前言

在某些场景中,比方GROUP BY聚合之后的后果,须要去更新之前的结果值。这个时候,须要将 Kafka 记录的 key 当成主键解决,用来确定一条数据是应该作为插入、删除还是更新记录来解决。在 Flink1.11 中,能够通过 flink-cdc-connectors 项目提供的 changelog-json format 来实现该性能。

在 Flink1.12 版本中, 新增了一个 upsert connector(upsert-kafka),该 connector 扩大自现有的 Kafka connector,工作在 upsert 模式(FLIP-149)下。新的 upsert-kafka connector 既能够作为 source 应用,也能够作为 sink 应用,并且提供了与现有的 kafka connector 雷同的基本功能和持久性保障,因为两者之间复用了大部分代码。

二 upsert kafka connector

Upsert Kafka Connector容许用户以upsert的形式从Kafka主题读取数据或将数据写入Kafka主题。

作为 source,upsert-kafka 连接器生产 changelog 流,其中每条数据记录代表一个更新或删除事件。更准确地说,数据记录中的 value 被解释为同一 key 的最后一个 value 的 UPDATE,如果有这个 key(如果不存在相应的 key,则该更新被视为 INSERT)。用表来类比,changelog 流中的数据记录被解释为 UPSERT,也称为 INSERT/UPDATE,因为任何具有相同 key 的现有行都被覆盖。另外,value 为空的消息将会被视作为 DELETE 消息。

作为 sink,upsert-kafka 连接器可以消费 changelog 流。它会将 INSERT/UPDATE_AFTER 数据作为正常的 Kafka 消息写入,并将 DELETE 数据以 value 为空的 Kafka 消息写入(表示对应 key 的消息被删除)。Flink 将根据主键列的值对数据进行分区,从而保证主键上的消息有序,因此同一主键上的更新/删除消息将落在同一分区中。

其中每条数据记录代表一个更新或删除事件,原理如下:

  • Kafka Topic中存在相应的Key,则以UPDATE操作将Key的值更新为数据记录中的Value。
  • Kafka Topic中不存在相应的Key,则以INSERT操作将Key的值写入Kafka Topic。
  • Key对应的Value为空,会被视作DELETE操作。

三 案例

3.1 kafka 处理后写入kafka

3.1.1 创建kafka topic
$ kafka-topics --create --topic user-behavior --partitions 3 --replication-factor 2 --bootstrap-server cdh68:9092,cdh69:9092,cdh70:9092
$ kafka-topics --create --topic after-user-behavior --partitions 3 --replication-factor 2 --bootstrap-server cdh68:9092,cdh69:9092,cdh70:9092
$ kafka-console-producer --topic user-behavior --broker-list cdh68:9092,cdh69:9092,cdh70:9092
$ kafka-console-consumer --topic user-behavior --from-beginning --group test-user --bootstrap-server cdh68:9092,cdh69:9092,cdh70:9092      
$ kafka-console-consumer --topic after-user-behavior --from-beginning --group test --bootstrap-server cdh68:9092,cdh69:9092,cdh70:9092
3.2 Flink SQL
3.2.1 source
%flink.ssql
drop table if exists user_behavior;
CREATE TABLE user_behavior (
    id BIGINT,
    name STRING,
    flag STRING
) WITH (
    'connector' = 'kafka',  -- 使用 kafka connector
    'topic' = 'user-behavior',  -- kafka topic
    'properties.group.id'='cdc', -- 消费者组
    'scan.startup.mode' = 'latest-offset',  -- 从起始 offset 开始读取
    'json.fail-on-missing-field' = 'false',
    'json.ignore-parse-errors' = 'true',
    'properties.bootstrap.servers' = 'cdh68:9092,cdh69:9092,cdh70:9092',  -- kafka broker 地址
    'format' = 'json'  -- 数据源格式为 json
);
3.2.2 sink
%flink.ssql
drop table if exists after_user_behavior;
CREATE TABLE after_user_behavior (
  name STRING,
  pv BIGINT,
  PRIMARY KEY (name) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'after-user-behavior',
  'properties.bootstrap.servers' = 'cdh68:9092,cdh69:9092,cdh70:9092',
  'value.json.fail-on-missing-field' = 'false',
  'key.json.ignore-parse-errors' = 'true',
  'key.format' = 'json',
  'value.format' = 'json'
);

一定要设置主键 Primar要使用 upsert-kafka connector,DDL语句中,一定要设置 PRIMARY KEY 主键,并为键(key.format)和值(value.format)指定序列化反序列化格式。

当数据源端进行了增删改,对应的 pv 结果就会同步更新,这就是 upsert kafka 的魅力。

这是基于kafka的统计计算,前提条件是 topic user-behavior中的数据是 changelog 格式的。

3.2.3 transform
%flink.ssql
INSERT INTO after_user_behavior
SELECT
  name,
  COUNT(*)
FROM user_behavior 
GROUP BY name;

注意:after_user_behavior 必须为 upsert-kafka connector

如果after_user_behavior为 kafka connector,执行此语句则会报如下错误:

java.io.IOException: org.apache.flink.table.api.TableException: Table sink 'default_catalog.default_database.after_user_behavior' doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[name], select=[name, COUNT(*) AS EXPR$1])

因为语句SELECT name, COUNT(*) FROM user_behavior GROUP BY name; 通过group by后数据是不断更新变化的,因此只能用 upsert-kafka connector。

3.3 输出结果
3.3.1 kafka user-behavior producer
[song@cdh68 ~]$ kafka-console-producer --topic user-behavior --broker-list cdh68:9092,cdh69:9092,cdh70:9092
>{"schema":"presto","flag":false,"name":"Mars","id":"85","type":"INSERT","table":"user","ts":6852139698555588608}
>{"schema":"presto","flag":false,"name":"Lucy","id":"67","type":"INSERT","table":"user","ts":6852139698555588608}
>{"schema":"presto","flag":false,"name":"Mars","id":"85","type":"INSERT","table":"info","ts":6852139698555588608}
>{"schema":"presto","flag":false,"name":"Lucy","id":"67","type":"INSERT","table":"info","ts":6852139698555588608}
>{"schema":"presto","flag":false,"name":"Mars","id":"85","type":"INSERT","table":"user","ts":6852139698555588608}
>{"schema":"presto","flag":false,"name":"Mars","id":"85","type":"INSERT","table":"user","ts":6852139698555588608}
>{"schema":"presto","flag":false,"name":"Mars","id":"85","type":"UPDATE","table":"user","ts":6852139698555588608}
>{"schema":"presto","flag":false,"name":"Mars","id":"85","type":"DELETE","table":"user","ts":6852139698555588608}

topic user-behavior中的数据是 changelog 格式的。

3.3.2 kafka user-behavior consumer
[song@cdh70 ~]$ kafka-console-consumer --topic user-behavior --group test-user --bootstrap-server cdh68:9092,cdh69:9092,cdh70:9092
{"schema":"presto","flag":false,"name":"Mars","id":"85","type":"INSERT","table":"user","ts":6852139698555588608}
{"schema":"presto","flag":false,"name":"Lucy","id":"67","type":"INSERT","table":"user","ts":6852139698555588608}
{"schema":"presto","flag":false,"name":"Mars","id":"85","type":"INSERT","table":"info","ts":6852139698555588608}
{"schema":"presto","flag":false,"name":"Lucy","id":"67","type":"INSERT","table":"info","ts":6852139698555588608}
{"schema":"presto","flag":false,"name":"Mars","id":"85","type":"INSERT","table":"user","ts":6852139698555588608}
{"schema":"presto","flag":false,"name":"Mars","id":"85","type":"INSERT","table":"user","ts":6852139698555588608}
{"schema":"presto","flag":false,"name":"Mars","id":"85","type":"UPDATE","table":"user","ts":6852139698555588608}
{"schema":"presto","flag":false,"name":"Mars","id":"85","type":"DELETE","table":"user","ts":6852139698555588608}
3.3.3 kafka after-user-behavior consumer
[song@cdh69 ~]$ kafka-console-consumer --topic after-user-behavior --group test --bootstrap-server cdh68:9092,cdh69:9092,cdh70:9092
{"name":"Mars","pv":1}
{"name":"Lucy","pv":1}
{"name":"Mars","pv":2}
{"name":"Lucy","pv":2}
{"name":"Mars","pv":3}
{"name":"Mars","pv":4}
{"name":"Mars","pv":5}
{"name":"Mars","pv":6}
3.3.4 FlinkSQL user_behavior

Flink Upsert Kafka SQL Connector 介绍,大数据从入门到精通,flink,kafka

从此结果可以看出 kafka 和 upsert-kafka 的区别:

kafka 的结果则显示所有数据,upsert-kafka则显示更新后的最新数据。

3.3.5 FlinkSQL alfter_user_behavior

Flink Upsert Kafka SQL Connector 介绍,大数据从入门到精通,flink,kafka

此结果是动态变化的,变化与kafka after-user-behavior consumer相同。

可见,upsert-kafka 表存储了所有变化的数据,但是读取时,只读取最新的数据。

3.2 flink-pageviews-demo

https://github.com/fsk119/flink-pageviews-demo

3.2.1 测试数据准备

在 Mysql 中执行以下命令:

CREATE DATABASE flink;
USE flink;

CREATE TABLE users (
  user_id BIGINT,
  user_name VARCHAR(1000),
  region VARCHAR(1000)
);

INSERT INTO users VALUES 
(1, 'Timo', 'Berlin'),
(2, 'Tom', 'Beijing'),
(3, 'Apple', 'Beijing');

现在,我们利用Sql client在Flink中创建相应的表。

CREATE TABLE users (
  user_id BIGINT,
  user_name STRING,
  region STRING
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'localhost',
  'database-name' = 'flink',
  'table-name' = 'users',
  'username' = 'root',
  'password' = '123456'
);

CREATE TABLE pageviews (
  user_id BIGINT,
  page_id BIGINT,
  view_time TIMESTAMP(3),
  proctime AS PROCTIME()
) WITH (
  'connector' = 'kafka',
  'topic' = 'pageviews',
  'properties.bootstrap.servers' = 'localhost:9092',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

并利用Flink 往 Kafka中灌入相应的数据

INSERT INTO pageviews VALUES
  (1, 101, TO_TIMESTAMP('2020-11-23 15:00:00')),
  (2, 104, TO_TIMESTAMP('2020-11-23 15:00:01.00'));
3.2.2 将 left join 结果写入 kafka

我们首先测试是否能将Left join的结果灌入到 Kafka 之中。

首先,我们在 Sql client 中创建相应的表

CREATE TABLE enriched_pageviews (
  user_id BIGINT,
  user_region STRING,
  page_id BIGINT,
  view_time TIMESTAMP(3),
  WATERMARK FOR view_time as view_time - INTERVAL '5' SECOND,
  PRIMARY KEY (user_id, page_id) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'enriched_pageviews',
  'properties.bootstrap.servers' = 'localhost:9092',
  'key.format' = 'json',
  'value.format' = 'json'
);

并利用以下语句将left join的结果插入到kafka对应的topic之中。

INSERT INTO enriched_pageviews
SELECT pageviews.user_id, region, pageviews.page_id, pageviews.view_time
FROM pageviews
LEFT JOIN users ON pageviews.user_id = users.user_id;

利用以下命令,我们可以打印topic内的数据kafka-console-consumer.sh --bootstrap-server kafka:9094 --topic "enriched_pageviews" --from-beginning --property print.key=true

#预期结果
{"user_id":1,"page_id":101}	{"user_id":1,"user_region":null,"page_id":101,"view_time":"2020-11-23 15:00:00"}
{"user_id":2,"page_id":104}	{"user_id":2,"user_region":null,"page_id":104,"view_time":"2020-11-23 15:00:01"}
{"user_id":1,"page_id":101}	null
{"user_id":1,"page_id":101}	{"user_id":1,"user_region":"Berlin","page_id":101,"view_time":"2020-11-23 15:00:00"}
{"user_id":2,"page_id":104}	null
{"user_id":2,"page_id":104}	{"user_id":2,"user_region":"Beijing","page_id":104,"view_time":"2020-11-23 15:00:01"}

Left join中,右流发现左流没有join上但已经发射了,此时会发送DELETE消息,而非UPDATE-BEFORE消息清理之前发送的消息。详见org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator#processElement

我们可以进一步在mysql中删除或者修改一些数据,来观察进一步的变化。

UPDATE users SET region = 'Beijing' WHERE user_id = 1;

DELETE FROM users WHERE user_id = 1;
3.2.3 将聚合结果写入kafka

我们进一步测试将聚合的结果写入到 Kafka 之中。

在Sql client 中构建以下表

CREATE TABLE pageviews_per_region (
  user_region STRING,
  cnt BIGINT,
  PRIMARY KEY (user_region) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'pageviews_per_region',
  'properties.bootstrap.servers' = 'localhost:9092',
  'key.format' = 'json',
  'value.format' = 'json'
)

我们再用以下命令将数据插入到upsert-kafka之中。

INSERT INTO pageviews_per_region
SELECT
  user_region,
  COUNT(*)
FROM enriched_pageviews
WHERE user_region is not null
GROUP BY user_region;

我们可以通过以下命令查看 Kafka 中对应的数据

./kafka-console-consumer.sh --bootstrap-server kafka:9094 --topic "pageviews_per_region" --from-beginning --property print.key=true

# 预期结果
{"user_region":"Berlin"}	{"user_region":"Berlin","cnt":1}
{"user_region":"Beijing"}	{"user_region":"Beijing","cnt":1}
{"user_region":"Berlin"}	null
{"user_region":"Beijing"}	{"user_region":"Beijing","cnt":2}
{"user_region":"Beijing"}	{"user_region":"Beijing","cnt":1}

Flink Upsert Kafka SQL Connector 介绍,大数据从入门到精通,flink,kafka文章来源地址https://www.toymoban.com/news/detail-828927.html

到了这里,关于Flink Upsert Kafka SQL Connector 介绍的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【flink-sql实战】flink 主键声明与upsert功能实战

    主键用作 Flink 优化的一种提示信息。主键限制表明一张表或视图的 某个(些)列 是唯一的 并且不包含 Null 值 。 主键声明的列都是非 nullable 的。因此主键可以被用作表行级别的唯一标识。 主键可以和列的定义一起声明,也可以独立声明为表的限制属性,不管是哪种方式,

    2024年02月03日
    浏览(42)
  • Flink 实时数仓关键技术解读:Upsert Kafka 和 动态表(Dynamic Table)

    博主历时三年精心创作的《大数据平台架构与原型实现:数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行,点击《重磅推荐:建大数据平台太难了!给我发个工程原型吧!》了解图书详情,京东购书链接:https://item.jd.com/12677623.html,扫描左侧

    2024年02月22日
    浏览(48)
  • Flink Kafka[输入/输出] Connector

    本章重点介绍生产环境中最常用到的 Flink kafka connector 。使用 Flink 的同学,一定会很熟悉 kafka ,它是一个分布式的、分区的、多副本的、 支持高吞吐的、发布订阅消息系统。生产环境环境中也经常会跟 kafka 进行一些数据的交换,比如利用 kafka consumer 读取数据,然后进行一系

    2024年02月04日
    浏览(40)
  • Iceberg从入门到精通系列之七:Flink SQL创建Catalog

    type:必须是iceberg catalog-type:内置了hive和hadoop两种catalog,也可以使用catalog-impl来自定义catalog。 catalog-impl:自定义catalog实现的全限定类名。如果未设置catalog-type,则必须设置。 property-version:描述属性版本的版本号。此属性可用于向后兼容,以防属性格式更改。当前属性版本

    2024年02月11日
    浏览(58)
  • Iceberg从入门到精通系列之八:flink sql 创建Iceberg表

    建表命令支持最常用的flink建表语法,包括: PARTITION BY(column1,column2,…):配置分区,apache flink不支持隐藏分区。 COMMENT ‘table document’:指定表的备注 WITH(‘key’=‘value’,…):设置表属性

    2024年02月11日
    浏览(66)
  • (五)kafka从入门到精通之topic介绍

    Kafka是一个流行的分布式消息系统,它的核心是一个由多个节点组成的分布式集群。在Kafka中,数据被分割成多个小块,并通过一些复杂的算法在节点之间传递。这些小块被称为Kafka Topic。 一个Topic是一组具有相同主题的消息。可以将Topic看作是一个数据仓库,在这个仓库中存

    2024年02月12日
    浏览(35)
  • kerberos认证Flink的kafka connector和kafka client配置

    1. kafka配置文件 kafka jaas必须配置,如果缺少,则报一下错误。 对于Flink只能通过配置 java.security.auth.login.config 的方式。 jaas配置 1.1 方式一: System.setProperty配置系统变量: kafka_client_jaas_keytab.conf文件内容如下: 1.2 方法二:在IDEA中添加jvm参数: 注意:将参数添加至kafka 的pr

    2024年02月04日
    浏览(40)
  • Flink SQL Hive Connector使用场景

    目录 1.介绍 2.使用 2.1注册HiveCatalog 2.2Hive Read 2.2.1流读关键配置 2.2.2示例

    2024年02月06日
    浏览(44)
  • Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】

           今天一天争取搞完最后这一部分,学完赶紧把 Kafka 和 Flume 学完,就要开始做实时数仓了。据说是应届生得把实时数仓搞个 80%~90% 才能差不多找个工作,太牛马了。         之前我们已经用过了一些简单的内置连接器,比如 \\\'datagen\\\' 、\\\'print\\\' ,其它的可以查看官网:

    2024年01月24日
    浏览(56)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包