2.1、如何在FlinkSQL中读取&写入到Kafka

这篇具有很好参考价值的文章主要介绍了2.1、如何在FlinkSQL中读取&写入到Kafka。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

1、环境设置

方式1:在Maven工程中添加pom依赖

方式2:在 sql-client.sh 中添加 jar包依赖

2、读取Kafka

2.1 创建 kafka表

2.2 读取 kafka消息体(Value)

使用 'format' = 'json' 解析json格式的消息

使用 'format' = 'csv' 解析csv格式的消息

使用 'format' = 'raw' 解析kafka消息为单个字符串字段

2.3 读取 kafka消息键(Key)

2.4 读取 kafka元数据(Metadata)

2.5 如何指定起始消费位点

从指定的timestamp开始消费:

从指定的timestamp开始消费:

2.6 创建 kafka表时,添加水位线生成策略

3、写入Kafka

3.1 写入 kafka时,可以指定的元数据


1、环境设置

        Kafka 连接器提供从 Kafka topic 中消费和写入数据的能力。

        官网链接:官网

方式1:在Maven工程中添加pom依赖

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka</artifactId>
  <version>1.17.1</version>
</dependency>

方式2:在 sql-client.sh 中添加 jar包依赖

        将 flink-sql-connector-kafka-1.17.1.jar 上传到flink的lib目录下 (可以去官网下载jar包)

        或者 启动 sql-client.sh 时,指定jar依赖

bin/sql-client.sh -j lib/flink-sql-connector-kafka-1.17.1.jar

2、读取Kafka

2.1 创建 kafka表

CREATE TABLE SourceKafkaTable (
   指定物理字段,
   指定元数据字段,
   指定水位线生成策略
) WITH (
  'connector' = 'kafka',                             --【必选】指定 连接器类型,kafka用'kafka'
  'properties.bootstrap.servers' = 'localhost:9092', --【必选】指定 Kafka broker列表,用逗号分隔
  'topic' = 'user_behavior',                         --【必选】指定 topic列表,用逗号分隔
  'topic-pattern' = '.*log_kafka.*',                 --【必选】指定 匹配读取 topic 名称的正则表达式, 和 topic 配置一个即可
  'properties.group.id' = 'testGroup',               --【可选】指定 消费者组id,不指定时会自定生成 KafkaSource-{tableIdentifier}
  'scan.startup.mode' = 'earliest-offset',           --【可选】指定起始消费位点,默认值 earliest-offset
  'format' = 'csv'                                   --【必选】指定 消息的格式类型, 和 value.format 是等价的(配置一个即可)
);

2.2 读取 kafka消息体(Value)

在FlinkSQL读取kafka时,可以根据kafka存储的消息的格式,通过 'value.format' = 'csv|raw|json...'

来指定使用哪种格式来对kafka消息进行解析,并将解析的结果映射到表中的字段中去。

2.1、如何在FlinkSQL中读取&写入到Kafka,# FlinkSQL 使用技巧,kafka,分布式


使用 'format' = 'json' 解析json格式的消息

当 kafka消息为json格式,可以使用  'format' = 'json' 在创建表时对json串进行解析,并将解析后的结果映射到表中的字段中去

注意:这种方式只能解析单层级的json格式,多层级时无法解析

           如果为多层级json格式时,可以使用raw格式 + udf函数来对json进行解析

导入Maven的pom依赖

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-json</artifactId>
  <version>1.17.1</version>
</dependency>

创建FlinkTable

-- TODO 创建用于读取kafka消息的flink表(消息格式为json)
-- kafka消息示例:{"ID":0,"NAME":"大王0"}
CREATE TABLE kafka_table_source_json (
  `ID` STRING,
  `NAME` STRING
) WITH (
  'connector' = 'kafka',
  'topic' = '20231009',
  'properties.bootstrap.servers' = 'worker01:9092',
  'properties.group.id' = 'FlinkConsumer',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json',
  'json.fail-on-missing-field' = 'false',
  'json.ignore-parse-errors' = 'true'
);

-- TODO 解析json串时,容错性设置
'json.fail-on-missing-field' = 'false' -- 当解析字段缺失时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)
'json.ignore-parse-errors' = 'true'  -- 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null。

-- 触发读取kafka操作
select * from kafka_table_source_json;

运行结果:

2.1、如何在FlinkSQL中读取&写入到Kafka,# FlinkSQL 使用技巧,kafka,分布式


使用 'format' = 'csv' 解析csv格式的消息

当 kafka消息为csv格式,可以使用  'format' = 'csv' 在创建表时对csv进行解析,并将解析后的结果映射到表中的字段中去

导入Maven的pom依赖

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-csv</artifactId>
  <version>1.17.1</version>
</dependency>

创建FlinkTable

-- TODO 创建用于读取kafka消息的flink表(消息格式为csv)
-- kafka消息示例:2,3.1
CREATE TABLE kafka_table_source_csv (
  `order_id` BIGINT,
  `price` DOUBLE
) WITH (
  'connector' = 'kafka',
  'topic' = 'csv_format',
  'properties.bootstrap.servers' = 'worker01:9092',
  'properties.group.id' = 'FlinkConsumer',
  'scan.startup.mode' = 'earliest-offset',
  'value.format' = 'csv'
);

-- 触发读取kafka操作
select * from kafka_table_source_csv;

运行结果:

2.1、如何在FlinkSQL中读取&写入到Kafka,# FlinkSQL 使用技巧,kafka,分布式


使用 'format' = 'raw' 解析kafka消息为单个字符串字段

可以使用  'format' = 'raw' 将kafka消息以原始格式映射到flink表中的string类型的字段中

创建FlinkTable

-- TODO 创建用于读取kafka消息的flink表(消息格式为json)
-- kafka消息示例:{"ID":0,"NAME":"大王0"}
CREATE TABLE kafka_table_source_raw (
  `log` STRING
) WITH (
  'connector' = 'kafka',
  'topic' = '20231009',
  'properties.bootstrap.servers' = 'worker01:9092',
  'properties.group.id' = 'FlinkConsumer',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'raw'
);

-- 触发读取kafka操作
select * from kafka_table_source_raw;

运行结果:

2.1、如何在FlinkSQL中读取&写入到Kafka,# FlinkSQL 使用技巧,kafka,分布式


2.3 读取 kafka消息键(Key)

kafka消息信息:

{
    "key":{
        "ID_1":0,
        "NAME_1":"大王0"
    },
    "value":{
        "ID":0,
        "NAME":"大王0"
    },
    "metadata":{
        "offset":0,
        "topic":"readKey",
        "partition":0
    }
}

创建FlinkTable

-- 读取kafka消息中的key部分
CREATE TABLE kafka_table_source_read_key (
  `ID` STRING,
  `NAME` STRING,
  `ID_1` STRING,
  `NAME_1` STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'readKey',
  'properties.bootstrap.servers' = 'worker01:9092',
  'properties.group.id' = 'FlinkConsumer',
  'scan.startup.mode' = 'earliest-offset',
  'key.format' = 'json',
  'key.json.ignore-parse-errors' = 'true',
  'key.fields' = 'ID_1;NAME_1',
  'value.format' = 'json'
);

2.4 读取 kafka元数据(Metadata)

创建FlinkTable

-- TODO 创建读取kafka表时,同时读取kafka元数据字段
CREATE TABLE kafka_table_source_read_metadata (
  `log` STRING,
  `topic` STRING METADATA VIRTUAL, -- 消息所属的 topic
  `partition` BIGINT METADATA VIRTUAL, -- 消息所属的 partition ID
  `offset` BIGINT METADATA VIRTUAL, -- 消息在partition中的 offset
  `timestamp` TIMESTAMP(3) METADATA FROM 'timestamp' -- 消息的时间戳
) WITH (
  'connector' = 'kafka',
  'topic' = 'readKey',
  'properties.bootstrap.servers' = 'worker01:9092',
  'properties.group.id' = 'FlinkConsumer',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'raw'
);

select * from kafka_table_source_read_metadata;

2.1、如何在FlinkSQL中读取&写入到Kafka,# FlinkSQL 使用技巧,kafka,分布式


2.5 如何指定起始消费位点

scan.startup.mode 配置项决定了 Kafka consumer 的启动模式。有效值为:

  • group-offsets:从 Zookeeper/Kafka 中某个指定的消费组已提交的偏移量开始。
  • earliest-offset:从可能的最早偏移量开始。
  • latest-offset:从最末尾偏移量开始。
  • timestamp:从用户为每个 partition 指定的时间戳开始。
    • 如果使用了 timestamp,必须使用另外一个配置项              scan.startup.timestamp-millis=时间戳(毫秒值)
  • specific-offsets:从用户为每个 partition 指定的偏移量开始。
    • 如果使用了 specific-offsets,必须使用另外一个配置项 scan.startup.specific-offsets 来为每个 partition 指定起始偏移量, 例如,选项值 partition:0,offset:42;partition:1,offset:300 表示 partition 0 从偏移量 42 开始,partition 1 从偏移量 300 开始

默认值 group-offsets 表示从 Zookeeper/Kafka 中最近一次已提交的偏移量开始消费。

2.1、如何在FlinkSQL中读取&写入到Kafka,# FlinkSQL 使用技巧,kafka,分布式

从指定的timestamp开始消费:

// --------------------------------------------------------------------------------------------
//  TODO 从指定的timestamp开始消费
// --------------------------------------------------------------------------------------------
drop table kafka_table_source_test_startup_timestamp;
CREATE TABLE kafka_table_source_test_startup_timestamp (
  `log` STRING,
  `ts` TIMESTAMP(3) METADATA FROM 'timestamp',
  `offset` BIGINT METADATA VIRTUAL
) WITH (
  'connector' = 'kafka',
  'topic' = '20231009',
  'properties.bootstrap.servers' = 'worker01:9092',
  'properties.group.id' = 'FlinkConsumer',
  'scan.startup.mode' = 'timestamp', -- 从用户为每个 partition 指定的时间戳开始
  'scan.startup.timestamp-millis' = '1697008386973', -- 从 指定的timestamp开始(包括)消费
   'value.format' = 'raw'
);

select * 
,cast(UNIX_TIMESTAMP(cast(ts as string), 'yyyy-MM-dd HH:mm:ss.SSS') as string) || SPLIT_INDEX(cast(ts as string),'.',1) as timestamp_hmz
from kafka_table_source_test_startup_timestamp;

运行结果:

2.1、如何在FlinkSQL中读取&写入到Kafka,# FlinkSQL 使用技巧,kafka,分布式

从指定的timestamp开始消费:

// --------------------------------------------------------------------------------------------
//  TODO 从指定的offset开始消费
// --------------------------------------------------------------------------------------------
drop table kafka_table_source_test_startup_offsets;
CREATE TABLE kafka_table_source_test_startup_offsets (
  `log` STRING,
  `ts` TIMESTAMP(3) METADATA FROM 'timestamp',
  `offset` BIGINT METADATA VIRTUAL
) WITH (
  'connector' = 'kafka',
  'topic' = '20231009',
  'properties.bootstrap.servers' = 'worker01:9092',
  'properties.group.id' = 'FlinkConsumer',
  'scan.startup.mode' = 'specific-offsets', -- 从用户为每个 partition 指定的偏移量开始
  'scan.startup.specific-offsets' = 'partition:0,offset:4', -- 为每个 partition 指定起始偏移量
   'value.format' = 'raw'
);

select * from kafka_table_source_test_startup_offsets;

运行结果:

2.1、如何在FlinkSQL中读取&写入到Kafka,# FlinkSQL 使用技巧,kafka,分布式文章来源地址https://www.toymoban.com/news/detail-715622.html


2.6 创建 kafka表时,添加水位线生成策略

// --------------------------------------------------------------------------------------------
//  TODO 创建 kafka表时,添加水位线生成策略
// --------------------------------------------------------------------------------------------
drop table kafka_table_source_test_watermark;
CREATE TABLE kafka_table_source_test_watermark (
  `log` STRING,
  `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
  `offset` BIGINT METADATA VIRTUAL,
  WATERMARK FOR event_time AS event_time -- 根据kafka的timestamp,生成水位线,使用 严格递增时间戳水位线生成策略
) WITH (
  'connector' = 'kafka',
  'topic' = '20231009',
  'properties.bootstrap.servers' = 'worker01:9092',
  'properties.group.id' = 'FlinkConsumer',
  'scan.startup.mode' = 'specific-offsets', -- 从用户为每个 partition 指定的偏移量开始
  'scan.startup.specific-offsets' = 'partition:0,offset:4', -- 为每个 partition 指定起始偏移量
   'value.format' = 'raw'
);

select * from kafka_table_source_test_watermark;

3、写入Kafka

3.1 写入 kafka时,可以指定的元数据

// --------------------------------------------------------------------------------------------
//  TODO 通过flinksql向kafka写入数据(写入时指定 timestamp)
// --------------------------------------------------------------------------------------------
drop table kafka_table_source_test_startup_mode;
CREATE TABLE kafka_table_source_test_startup_mode (
  `order_id` BIGINT,
  `price` DOUBLE,
  `ts` TIMESTAMP(3) METADATA FROM 'timestamp',
  `offset` BIGINT METADATA VIRTUAL
) WITH (
  'connector' = 'kafka',
  'topic' = '20231011',
  'properties.bootstrap.servers' = 'worker01:9092',
  'properties.group.id' = 'FlinkConsumer',
  'scan.startup.mode' = 'earliest-offset',
  'value.format' = 'csv'
);

insert into kafka_table_source_test_startup_mode(order_id, price,ts)
SELECT * FROM (VALUES
  (1, 2.0,TO_TIMESTAMP_LTZ(1000, 3))
, (2, 4.0,TO_TIMESTAMP_LTZ(2000, 3))
, (3, 6.0,TO_TIMESTAMP_LTZ(3000, 3))
, (4, 7.0,TO_TIMESTAMP_LTZ(4000, 3))
, (5, 8.0,TO_TIMESTAMP_LTZ(5000, 3))
, (6, 10.0,TO_TIMESTAMP_LTZ(6000, 3))
, (7, 12.0,TO_TIMESTAMP_LTZ(7000, 3))
) AS book (order_id, price,ts);

-- 触发读取kafka操作
select * from kafka_table_source_test_startup_mode;

到了这里,关于2.1、如何在FlinkSQL中读取&写入到Kafka的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 第3、4章 Kafka 生产者 和 消费者 ——向 Kafka 写入数据 和读取数据

    重要的特性: 消息通过 队列来进行交换 每条消息仅会传递给一个消费者 消息传递有先后顺序,消息被消费后从队列删除(除非使用了消息优先级) 生产者或者消费者可以动态加入 传送模型: 异步即发即弃:生产者发送一条消息,不会等待收到一个响应 异步请求、应答:

    2024年02月20日
    浏览(40)
  • kafka2_企业级案例

    课程回顾: 1,从端口接收数据,通过channel ,sink 最终这个数据到日志, – 控制台输出 ,到logj.properties, nc -l localhost port , source TCP 2监控hive的日志文件,将数据输出到hdfs上存储 2.1监控单个文件的追加,读取 sink输出到日志 tail - f 文件路径 exec source .其余的不变. echo 覆盖 echo’ 追加, 使

    2024年04月16日
    浏览(46)
  • kafka2.x和3.x相关命令

    ##################### Kafka2.x命令 ##################### kafka-topics.sh --zookeeper n11hdp01:2181,n12hdp02:2181,n13hdp03:2181/kafka --create --replication-factor 3 --partitions 3 --topic event_oper_input kafka-topics.sh --zookeeper n11hdp01:2181,n12hdp02:2181,n13hdp03:2181/kafka --create --replication-factor 3 --partitions 3 --topic event_oper_output kafk

    2024年02月09日
    浏览(39)
  • kafka2.8.1升级至3.4.0教程

    上传、解压新版本kafka到/opt/kafka:kafka_2.12-3.4.0.tgz 将旧版本的config/server.properties拷贝覆盖到新版本,并且修改以下配置 将旧版本的kafka-broker-jaas.conf文件覆盖到新版本(acl权限,没做可以忽略) ps:如果还修改了config底下的其他配置文件,酌情进行修改 修改启动配置:/opt/k

    2024年02月12日
    浏览(42)
  • 手拉手安装Kafka2.13发送和消费消息

    Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 Kafka启动方式有Zookeeper和Kraft,两种方式只能选择其中一种启动,不能同时使用。 Kafka下载https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz 解压tar -xzf kafka_2.13-3.7.0.tgz Kafka依赖Zook

    2024年04月29日
    浏览(49)
  • kafka2.x版本配置SSL进行加密和身份验证

    背景:找了一圈资料,都是东讲讲西讲讲,最后我还没搞好,最终决定参考官网说明。 官网指导手册地址:Apache Kafka 需要预备的知识,keytool和openssl 关于keytool的参考:keytool的使用-CSDN博客 关于openssl的参考:openssl常用命令大全_openssl命令参数大全-CSDN博客 先只看SSL安全机制

    2024年03月22日
    浏览(40)
  • python如何操作excel,在excel中读取和写入数据

    Excel 是 Microsoft 为使用 Windows 和 Apple Macintosh 操作系统的电脑编写的一款电子表格软件。直观的界面、出色的计算功能和图表工具,再加上成功的市场营销,使 Excel 成为最流行的个人计算机数据处理软件。在 1993 年,作为 Microsoft Office 的组件发布了5.0版之后, Excel 就开始成为

    2024年02月03日
    浏览(61)
  • centos 7 kafka2.6单机安装及动态认证SASL SCRAM配置

    目录 1.kfaka安装篇 1.1 安装jdk 1.2安装kafka 2.安全篇 2.1 kafka安全涉及3部份: 2.2 Kafka权限控制认证方式 2.3 SASL/SCRAM-SHA-256 配置实例 2.3.1 创建用户 2.3.2 创建 JAAS 文件及配置 3.测试 3.1 创建测试用户 3.2 配置JAAS 文件 3.2.1 生产者配置 3.2.2 消费者配置 3.3 消息收发测试 依赖环境:

    2024年02月07日
    浏览(37)
  • FlinkSQL kafka完整案例 可直接复制使用

    为自己记录一下flinksql 消费kafka json数据 并写入doris的完整案例 用完发现,flinksql 是真的香。 虽然尽量追求完整,但是从kafka造数据开始写,过于累赘因此省略。正文开始。 kafka原始数据 原始数据形式 flinksql 连接 准备连接sql 以下的连接器元数据可以在表定义中通过元数据列

    2024年02月14日
    浏览(39)
  • flinksql kafka到mysql累计指标练习

    数据流向:kafka -kafka -mysql 模拟写数据到kafka topic:wxt中 kafka topic :wxt1 kafka topic :wxt2 mysql结果数据: pom文件

    2024年02月08日
    浏览(28)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包