6.2、Flink数据写入到Kafka

这篇具有很好参考价值的文章主要介绍了6.2、Flink数据写入到Kafka。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

1、添加POM依赖

2、API使用说明

3、序列化器

3.1 使用预定义的序列化器

3.2 使用自定义的序列化器

4、容错保证级别

4.1 至少一次 的配置

4.2 精确一次 的配置

5、这是一个完整的入门案例


1、添加POM依赖

Apache Flink 集成了通用的 Kafka 连接器,使用时需要根据生产环境的版本引入相应的依赖

<!-- 引入 kafka连接器依赖-->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    <version>1.17.1</version>
</dependency>

2、API使用说明

KafkaSink 可将数据流写入一个或多个 Kafka topic。

官网链接:官网链接

6.2、Flink数据写入到Kafka,# Flink API 使用技巧,flink,kafka,linq

DataStream<String> stream = ...;
        
KafkaSink<String> sink = KafkaSink.<String>builder()  // 泛型为 输入输入的类型
        // TODO 必填项:配置 kafka 的地址和端口
        .setBootstrapServers(brokers)
        // TODO 必填项:配置消息序列化器信息 Topic名称、消息序列化器类型
        .setRecordSerializer(KafkaRecordSerializationSchema.builder()
            .setTopic("topic-name")
            .setValueSerializationSchema(new SimpleStringSchema())
            .build()
        )
        // TODO 必填项:配置容错保证级别 精准一次、至少一次、不做任何保证
        .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
        .build();
        
stream.sinkTo(sink);

3、序列化器

序列化器的作用是将flink数据转换成 kafka的ProducerRecord

6.2、Flink数据写入到Kafka,# Flink API 使用技巧,flink,kafka,linq

3.1 使用预定义的序列化器

功能:将 DataStream 数据转换为 Kafka消息中的value,key为默认值null,timestamp为默认值

// 初始化 KafkaSink 实例
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
        // TODO 必填项:配置 kafka 的地址和端口
        .setBootstrapServers("worker01:9092")
        // TODO 必填项:配置消息序列化器信息 Topic名称、消息序列化器类型
        .setRecordSerializer(
                KafkaRecordSerializationSchema.<String>builder()
                        .setTopic("20230912")
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build()
        )
        .build();

3.2 使用自定义的序列化器

功能:可以对 kafka消息的key、value、partition、timestamp进行赋值

/**
 * 如果要指定写入kafka的key,可以自定义序列化器:
 * 		1、实现 一个接口,重写 序列化 方法
 * 		2、指定key,转成 字节数组
 * 		3、指定value,转成 字节数组
 * 		4、返回一个 ProducerRecord对象,把key、value放进去
 */
// 初始化 KafkaSink 实例 (自定义 KafkaRecordSerializationSchema 实例)
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
        // TODO 必填项:配置 kafka 的地址和端口
        .setBootstrapServers("worker01:9092")
        // TODO 必填项:配置消息序列化器信息 Topic名称、消息序列化器类型
        .setRecordSerializer(
                new KafkaRecordSerializationSchema<String>() {

                    @Nullable
                    @Override
                    public ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext context, Long timestamp) {
                        String[] datas = element.split(",");
                        byte[] key = datas[0].getBytes(StandardCharsets.UTF_8);
                        byte[] value = element.getBytes(StandardCharsets.UTF_8);
                        Long currTimestamp = System.currentTimeMillis();
                        Integer partition = 0;
                        return new ProducerRecord<>("20230913", partition, currTimestamp, key, value);
                    }
                }
        )
        .build();

4、容错保证级别

KafkaSink 总共支持三种不同的语义保证(DeliveryGuarantee

  • DeliveryGuarantee.NONE   不提供任何保证
    • 消息有可能会因 Kafka broker 的原因发生丢失或因 Flink 的故障发生重复
  • DeliveryGuarantee.AT_LEAST_ONCE  至少一次
    • sink 在 checkpoint 时会等待 Kafka 缓冲区中的数据全部被 Kafka producer 确认。
    • 消息不会因 Kafka broker 端发生的事件而丢失,但可能会在 Flink 重启时重复,因为 Flink 会重新处理旧数据。
  • DeliveryGuarantee.EXACTLY_ONCE 精确一次
    • 该模式下,Kafka sink 会将所有数据通过在 checkpoint 时提交的事务写入。
    • 因此,如果 consumer 只读取已提交的数据(参见 Kafka consumer 配置 isolation.level),在 Flink 发生重启时不会发生数据重复。
    • 然而这会使数据在 checkpoint 完成时才会可见,因此请按需调整 checkpoint 的间隔。
    • 请确认事务 ID 的前缀(transactionIdPrefix)对不同的应用是唯一的,以保证不同作业的事务 不会互相影响!此外,强烈建议将 Kafka 的事务超时时间调整至远大于 checkpoint 最大间隔 + 最大重启时间,否则 Kafka 对未提交事务的过期处理会导致数据丢失。

4.1 至少一次 的配置

DataStream<String> stream = ...;

// 初始化 KafkaSink 实例
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
        // TODO 必填项:配置 kafka 的地址和端口
        .setBootstrapServers("worker01:9092")
        // TODO 必填项:配置消息序列化器信息 Topic名称、消息序列化器类型
        .setRecordSerializer(
                KafkaRecordSerializationSchema.<String>builder()
                        .setTopic("20230912")
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build()
        )
        // TODO 必填项:配置容灾保证级别设置为 至少一次
        .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
        .build();

stream.sinkTo(sink);

4.2 精确一次 的配置

// 如果是精准一次,必须开启checkpoint
env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);

DataStream<String> stream = ...;
        
KafkaSink<String> sink = KafkaSink.<String>builder()  // 泛型为 输入输入的类型
        // TODO 必填项:配置 kafka 的地址和端口
        .setBootstrapServers(brokers)
        // TODO 必填项:配置消息序列化器信息 Topic名称、消息序列化器类型
        .setRecordSerializer(KafkaRecordSerializationSchema.builder()
            .setTopic("topic-name")
            .setValueSerializationSchema(new SimpleStringSchema())
            .build()
        )
        // TODO 必填项:配置容灾保证级别设置为 精准一次
        .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
        // 如果是精准一次,必须设置 事务的前缀
        .setTransactionalIdPrefix("flink-")
        // 如果是精准一次,必须设置 事务超时时间: 大于checkpoint间隔,小于 max 15分钟
        .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "6000")
        .build();
        
stream.sinkTo(sink);

5、这是一个完整的入门案例

需求:Flink实时读取 socket数据源,将读取到的数据写入到Kafka (要保证不丢失,不重复)

开发语言:java1.8

flink版本:flink1.17.0文章来源地址https://www.toymoban.com/news/detail-709426.html

package com.baidu.datastream.sink;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.producer.ProducerConfig;

// TODO flink 数据输出到kafka
public class SinkKafka {
    public static void main(String[] args) throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        // 如果是精准一次,必须开启checkpoint
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);

        // 2.指定数据源
        DataStreamSource<String> streamSource = env.socketTextStream("localhost", 9999);

        // 3.初始化 KafkaSink 实例
        KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
                // TODO 必填项:配置 kafka 的地址和端口
                .setBootstrapServers("worker01:9092")
                // TODO 必填项:配置消息序列化器信息 Topic名称、消息序列化器类型
                .setRecordSerializer(
                        KafkaRecordSerializationSchema.<String>builder()
                                .setTopic("20230912")
                                .setValueSerializationSchema(new SimpleStringSchema())
                                .build()
                )
                // TODO 必填项:配置容灾保证级别设置为 精准一次
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                // 如果是精准一次,必须设置 事务的前缀
                .setTransactionalIdPrefix("flink-")
                // 如果是精准一次,必须设置 事务超时时间: 大于checkpoint间隔,小于 max 15分钟
                .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "6000")
                .build();

        streamSource.sinkTo(kafkaSink);

        // 3.触发程序执行
        env.execute();
    }
}

到了这里,关于6.2、Flink数据写入到Kafka的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Flink-Kafka-To-Mongo】使用 Flink 实现 Kafka 数据写入 Mongo(根据对应操作类型进行增、删、改操作,写入时对时间类型字段进行单独处理)

    需求描述: 1、数据从 Kafka 写入 Mongo。 2、相关配置存放于 Mysql 中,通过 Mysql 进行动态读取。 3、此案例中的 Kafka 是进行了 Kerberos 安全认证的,如果不需要自行修改。 4、Kafka 数据为 Json 格式,获取到的数据根据操作类型字段进行增删改操作。 5、读取时使用自定义 Source,写

    2024年02月22日
    浏览(37)
  • flink:通过table api把文件中读取的数据写入MySQL

    当写入数据到外部数据库时,Flink 会使用 DDL 中定义的主键。如果定义了主键,则连接器将以 upsert 模式工作,否则连接器将以 append 模式工作 文件info.txt

    2024年03月15日
    浏览(32)
  • flink写入到kafka 大坑解析。

    1.kafka能不能发送null消息?    能! 2 flink能不能发送null消息到kafka? 不能!     这里就报了java的最常见错误 空指针,原因就是flink要把kafka的消息getbytes。所以flink不能发送null到kafka。 这种问题会造成什么后果? flink直接挂掉。 如果我们采取了失败重试机制会怎样? 数据重

    2024年02月15日
    浏览(31)
  • 实战Flink Java api消费kafka实时数据落盘HDFS

    在Java api中,使用flink本地模式,消费kafka主题,并直接将数据存入hdfs中。 flink版本1.13 kafka版本0.8 hadoop版本3.1.4 为了完成 Flink 从 Kafka 消费数据并实时写入 HDFS 的需求,通常需要启动以下组件: 确保 Zookeeper 在运行,因为 Flink 的 Kafka Consumer 需要依赖 Zookeeper。 确保 Kafka Serve

    2024年01月24日
    浏览(38)
  • flink日志实时采集写入Kafka/ElasticSearch

    由于公司想要基于flink的日志做实时预警功能,故需要实时接入,并刷入es进行分析。 日志接入必须异步,不能影响服务性能 kafka集群宕机,依旧能够提交flink任务且运行任务 kafka集群挂起恢复,可以依旧续写实时运行日志 在类上加上@Plugin注解,标记为自定义appender 在类加上

    2024年02月08日
    浏览(43)
  • 记一次Flink通过Kafka写入MySQL的过程

    一、前言 总体思路:source --transform --sink ,即从source获取相应的数据来源,然后进行数据转换,将数据从比较乱的格式,转换成我们需要的格式,转换处理后,然后进行sink功能,也就是将数据写入的相应的数据库DB中或者写入Hive的HDFS文件存储。 思路: pom部分放到最后面。 二

    2024年01月24日
    浏览(36)
  • Flink流批一体计算(15):PyFlink Tabel API之SQL写入Sink

    目录 举个例子 写入Sink的各种情况 1. 将结果数据收集到客户端 2. 将结果数据转换为Pandas DataFrame,并收集到客户端 3. 将结果写入到一张 Sink 表中 4. 将结果写入多张 Sink 表中 举个例子 将计算结果写入给 sink 表 写入Sink的各种情况 1. 将结果数据收集到客户端 你可以使用 TableR

    2024年02月11日
    浏览(31)
  • 使用Flink处理Kafka中的数据

    目录         使用Flink处理Kafka中的数据 前提:  一, 使用Flink消费Kafka中ProduceRecord主题的数据 具体代码为(scala) 执行结果 二, 使用Flink消费Kafka中ChangeRecord主题的数据           具体代码(scala)                 具体执行代码①                 重要逻

    2024年01月23日
    浏览(37)
  • Flink使用 KafkaSource消费 Kafka中的数据

    目前,很多 flink相关的书籍和网上的文章讲解如何对接 kafka时都是使用的 FlinkKafkaConsumer,如下: 新版的 flink,比如 1.14.3已经将 FlinkKafkaConsumer标记为 deprecated(不推荐),如下: 新版本的 flink应该使用 KafkaSource来消费 kafka中的数据,详细代码如下: 开发者在工作中应该尽量避

    2024年02月15日
    浏览(30)
  • 掌握实时数据流:使用Apache Flink消费Kafka数据

            导读:使用Flink实时消费Kafka数据的案例是探索实时数据处理领域的绝佳方式。不仅非常实用,而且对于理解现代数据架构和流处理技术具有重要意义。         Apache Flink  是一个在 有界 数据流和 无界 数据流上进行有状态计算分布式处理引擎和框架。Flink 设计旨

    2024年02月03日
    浏览(64)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包