【数据可靠性】Flink和Kafka连接时的精确一次保证

这篇具有很好参考价值的文章主要介绍了【数据可靠性】Flink和Kafka连接时的精确一次保证。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Flink写入Kafka两阶段提交

端到端的 exactly-once(精准一次)

kafka -> Flink -> kafka

1)输入端

输入数据源端的 Kafka 可以对数据进行持久化保存,并可以重置偏移量(offset)

2)Flink内部

Flink 内部可以通过检查点机制保证状态和处理结果的 exactly-once 语义

3)输出端

两阶段提交(2PC)

写入 Kafka 的过程实际上是一个两段式的提交:处理完毕得到结果,写入 Kafka 时是基于事务的“预提交”;等到检查点保存完毕,才会提交事务进行“正式提交”

如果中间出现故障,事务进行回滚,预提交就会被放弃;恢复状态之后,也只能恢复所有已经确认提交的操作。

必须的配置

1)必须启用检查点

2)指定 KafkaSink 的发送级别为 DeliveryGuarantee.EXACTLY_ONCE

3)配置 Kafka 读取数据的消费者的隔离级别【默认kafka消费者隔离级别是读未提交,2PC第一阶段预提交数据也会被读到,下游消费者需要设置为读已提交

4)事务超时配置

【配置的事务超时时间 transaction.timeout.ms 默认是1小时,而Kafka 集群配置的事务最大超时时间 transaction.max.timeout.ms 默认是15 分钟。在检查点保存时间很长时,有可能出现 Kafka 已经认为事务超时了,丢弃了预提交的数据;而Sink任务认为还可以继续等待。如果接下来检查点保存成功,发生故障后回滚到这个检查点的状态,这部分数据就被真正丢掉了。因此checkpoint 间隔 < 事务超时时间 < max的15分钟

代码实战

kafka -> Flink -> kafka【Flink处理kafka来源数据再输出到kafka】

public class KafkaEOSDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 【1】、启用检查点,设置为精准一次
        env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
        checkpointConfig.setCheckpointStorage("hdfs://hadoop102:8020/chk");
        checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        // 2.读取 kafka
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers("hadoop102:9092")
                .setGroupId("default")
                .setTopics("topic_1")
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .setStartingOffsets(OffsetsInitializer.latest())
                .build();
        DataStreamSource<String> kafkasource = env
                .fromSource(kafkaSource,
                        WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource");

        /*
         3.写出到 Kafka
          精准一次 写入 Kafka,需要满足以下条件,【缺一不可】
          1、开启 checkpoint
          2、sink 设置保证级别为 精准一次
          3、sink 设置事务前缀
          4、sink 设置事务超时时间: checkpoint 间隔 < 事务超时时间 < max的15分钟
         */
        KafkaSink<String> kafkaSink = KafkaSink.<String>builder()// 指定 kafka 的地址和端口
                .setBootstrapServers("hadoop102:9092")
                // 指定序列化器:指定 Topic 名称、具体的序列化
                .setRecordSerializer(
                        KafkaRecordSerializationSchema.<String>builder()
                                .setTopic("ws")
                                .setValueSerializationSchema(new SimpleStringSchema())
                                .build()
                )
                // 【3.1】 精准一次,开启 2pc
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)

                // 【3.2】 精准一次,必须设置 事务的前缀
                .setTransactionalIdPrefix("li-")
                // 【3.3】 设置事务超时时间
                .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "")
                .build();
        kafkasource.sinkTo(kafkaSink);
        env.execute();
    }
}

后续读取“ws”这个 topic 的消费者,要设置事务的隔离级别为“读已提交”

public class KafkaEOSConsumer {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 消费 在前面使用【两阶段提交】写入的 Topic
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers("hadoop102:9092")
                .setGroupId("default")
                .setTopics("ws")
                .setValueOnlyDeserializer(new SimpleStringSchema()).setStartingOffsets(OffsetsInitializer.latest())
                // 作为 下游的消费者,要设置事务的隔离级别为 【读已提交】
                .setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed")
                .build();
        env
                .fromSource(kafkaSource,
                        WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource")
                .print();
        env.execute();
    }
}

处理程序以及消费程序如上设置才能真正实现端到端精准一次的保证。文章来源地址https://www.toymoban.com/news/detail-786028.html

到了这里,关于【数据可靠性】Flink和Kafka连接时的精确一次保证的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 防止消息丢失与消息重复——Kafka可靠性分析及优化实践

    上手第一关,手把手教你安装kafka与可视化工具kafka-eagle Kafka是什么,以及如何使用SpringBoot对接Kafka 架构必备能力——kafka的选型对比及应用场景 Kafka存取原理与实现分析,打破面试难关 在上一章内容中,我们解析了Kafka在读写层面上的原理,介绍了很多Kafka在读出与写入时的

    2024年02月08日
    浏览(42)
  • 深入理解TCP三次握手:连接可靠性与安全风险

    导言 TCP简介和工作原理的回顾 TCP三次握手的目的和步骤 TCP三次握手过程中可能出现的问题和安全风险 为什么TCP三次握手是必要的? 是否可以增加或减少三次握手的次数? TCP四次挥手与三次握手的异同点         在网络通信中,TCP(Transmission Control Protocol)作为一种可靠

    2024年02月14日
    浏览(39)
  • MySQL如何保证数据的可靠性(保证数据不丢失)

    只要 redo log 和 binlog 保证持久化到磁盘,就能确保MySQL异常重启后,数据可以恢复。 WAL 机制,( Write Ahead Log ): 事务先写入日志,后持久化到磁盘。 流程 每个线程内都有一个binlog cache,记录先写入binlog cache,所有线程共享一个binlog文件 binlog cache write into binlog file, binlog fi

    2024年02月09日
    浏览(38)
  • 数据湖的备份与恢复:保障数据的安全与可靠性

    数据湖是一种存储和管理大规模、多类型数据的分布式系统,它可以存储结构化、非结构化和半结构化数据,并提供数据的快速查询和分析能力。随着数据量的不断增加,数据湖的规模也不断扩大,数据的安全和可靠性变得越来越重要。因此,数据湖的备份与恢复成为了一项

    2024年03月15日
    浏览(37)
  • rabbitmq笔记-rabbitmq进阶-数据可靠性,rabbitmq高级特性

    消息何去何从 mandatory和immediate是channel.basicPublish方法的两个参数,都有消息传递过程中不可达目的地时将消息返回给生产者的功能。 mandatory参数 true:交换器无法根据自身的类型 和路由键找到符合条件的队列,rabbitmq调用Basic.Return命令将消息返回给生产者 生产者调用channel.

    2024年02月10日
    浏览(43)
  • 解析OpenDataPlatform的数据仓库:如何确保数据的准确性和可靠性?

    作者:禅与计算机程序设计艺术 互联网公司在实现业务需求时,一般会选择一种数据源作为基础数据,在数据处理过程需要对基础数据进行清洗、计算等加工操作。这些处理后的结果可以提供给公司内部各个部门、业务线使用,同时也方便了公司将数据用于分析报表或做决策

    2024年02月13日
    浏览(39)
  • 智能合约与数据验证技术:保障区块链系统的安全与可靠性

    区块链技术作为一种新兴的分布式数据存储和共享方式,具有很高的安全性和可靠性。然而,为了确保区块链系统的安全与可靠性,需要一些机制来保证数据的完整性和有效性。智能合约和数据验证技术就是这样一种机制,它们在区块链系统中扮演着关键的角色。 本文将从以

    2024年04月16日
    浏览(41)
  • 【可靠性测试】什么是可靠性测试:定义、方法和工具

    可靠性定义为在特定环境中指定时间段内无故障软件运行的概率。 执行可靠性测试是为了确保软件是可靠的,它满足其目的,在给定的环境中指定的时间量,并能够呈现无故障运行。 在这个机械化的世界里,现在人们盲目地相信任何软件。无论软件系统显示出什么结果,人

    2024年02月05日
    浏览(39)
  • TCP如何保证可靠性,TCP如何实现可靠性传输的

    tcp 如何保证可靠性 大家都知道TCP是可靠性传输协议,既然是可靠的,就需要解决比如包丢失了、数据被破坏了、包重复了、乱序了等等这样的问题。下面将从几个方面介绍TCP的可靠性。 1. 校验和 TCP每一段报文都有校验和,这保证了报文不被破坏或篡改,如果收到的报文在校

    2024年02月10日
    浏览(49)
  • 嵌入式硬件电路可靠性的关键问题的分析(可靠性介绍)

    :失效率 温度 可靠性 降额 器件工艺 质量与可靠性的区别 质量:时间点上去衡量                                              可靠性:一段时间上才能衡量,需要有量才能去衡量(大部分是产品量产之后才会出现问题) 质量:在时间点上衡量

    2024年03月24日
    浏览(47)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包