flink正常消费kafka数据,flink没有做checkpoint,kafka位点没有提交

这篇具有很好参考价值的文章主要介绍了flink正常消费kafka数据,flink没有做checkpoint,kafka位点没有提交。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1、背景
flink消费kafka数据,多并发,实现双流join
2、现象
(1)flink任务消费kafka数据,其中数据正常消费,kafka显示消息堆积,位点没有提交,并且flink任务没有做checkpoint
(2)其中一个流的subtask显示finished
(3)无背压
3、问题原因
(1)其中一个topic分区为1
(2)配置的并行度大于kafka的partition数,导致有部分subtask空闲,然后状态变为finished
在CheckpointCoordinator类的triggerCheckpoint方法中有如下代码段

// check if all tasks that we need to trigger are running.
        // if not, abort the checkpoint
        Execution[] executions = new Execution[tasksToTrigger.length];
        for (int i = 0; i < tasksToTrigger.length; i++) {
            Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
            if (ee == null) {
                LOG.info("Checkpoint triggering task {} of job {} is not being executed at the moment. Aborting checkpoint.",
                        tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
                        job);
                throw new CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
            } else if (ee.getState() == ExecutionState.RUNNING) {
                executions[i] = ee;
            } else {
                LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.",
                        tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
                        job,
                        ExecutionState.RUNNING,
                        ee.getState());
                throw new CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
            }

ee.getState() == ExecutionState.RUNNING判断execution的状态是否为running,否则不做checkpoint
4、解决办法
将读取只有一个partition的topic的source任务并发改成1
5、结论
在消费kafka的数据时,source的并发度不能超过kafka的partition数,否则部分source无数据消费,导致finished,可以小于partition,但是部分subtask就会消费多个partition的数据,导致吞吐达不到最大,理想状态是source并发度等于partition数。
问题结论:在消费kafka的数据时,source的并发度不能超过kafka的partition数,可以小于partition,但是部分subtask就会消费多个partition的数据,导致吞吐达不到最大,理想状态是source并发度等于partition数。

与下面的同学遇到了一样的问题
链接:https://www.jianshu.com/p/9110ff473280文章来源地址https://www.toymoban.com/news/detail-536826.html

到了这里,关于flink正常消费kafka数据,flink没有做checkpoint,kafka位点没有提交的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • flink checkpoint 两阶段提交协议详解

    在 Apache Flink 中,checkpoint 机制是实现容错和状态一致性的关键组件。其中,精确一次(Exactly-Once)处理模式通过两阶段提交协议来确保每个事件只被处理一次,即使在发生故障时也能保持状态的一致性。下面我将对 Flink 的 checkpoint 两阶段提交协议进行详细解释,并通过一个

    2024年04月29日
    浏览(35)
  • 大数据-玩转数据-FLINK-从kafka消费数据

    大数据-玩转数据-Kafka安装 运行本段代码,等待kafka产生数据进行消费。

    2024年02月14日
    浏览(38)
  • 轻松通关Flink第24讲:Flink 消费 Kafka 数据业务开发

    在上一课时中我们提过在实时计算的场景下,绝大多数的数据源都是消息系统,而 Kafka 从众多的消息中间件中脱颖而出,主要是因为 高吞吐 、 低延迟 的特点;同时也讲了 Flink 作为生产者像 Kafka 写入数据的方式和代码实现。这一课时我们将从以下几个方面介绍 Flink 消费

    2024年02月08日
    浏览(37)
  • Flink使用 KafkaSource消费 Kafka中的数据

    目前,很多 flink相关的书籍和网上的文章讲解如何对接 kafka时都是使用的 FlinkKafkaConsumer,如下: 新版的 flink,比如 1.14.3已经将 FlinkKafkaConsumer标记为 deprecated(不推荐),如下: 新版本的 flink应该使用 KafkaSource来消费 kafka中的数据,详细代码如下: 开发者在工作中应该尽量避

    2024年02月15日
    浏览(37)
  • 掌握实时数据流:使用Apache Flink消费Kafka数据

            导读:使用Flink实时消费Kafka数据的案例是探索实时数据处理领域的绝佳方式。不仅非常实用,而且对于理解现代数据架构和流处理技术具有重要意义。         Apache Flink  是一个在 有界 数据流和 无界 数据流上进行有状态计算分布式处理引擎和框架。Flink 设计旨

    2024年02月03日
    浏览(80)
  • 流批一体计算引擎-4-[Flink]消费kafka实时数据

    Python3.6.9 Flink 1.15.2消费Kafaka Topic PyFlink基础应用之kafka 通过PyFlink作业处理Kafka数据 PyFlink需要特定的Python版本,Python 3.6, 3.7, 3.8 or 3.9。 1.3.1 python3和pip3的配置 一、系统中安装了多个版本的python3 。 二、环境变量path作用顺序 三、安装Pyflink 1.3.2 配置Flink Kafka连接 (1)在https://mvnr

    2024年02月06日
    浏览(42)
  • 【Flink-Kafka-To-RocketMQ】使用 Flink 自定义 Sink 消费 Kafka 数据写入 RocketMQ

    这里的 maven 依赖比较冗余,推荐大家都加上,后面陆续优化。 注意: 1、此程序中所有的相关配置都是通过 Mysql 读取的(生产环境中没有直接写死的,都是通过配置文件动态配置),大家实际测试过程中可以将相关配置信息写死。 2、此程序中 Kafka 涉及到了 Kerberos 认证操作

    2024年02月03日
    浏览(51)
  • flink如何初始化kafka数据源的消费偏移

    我们知道在日常非flink场景中消费kafka主题时,我们只要指定了消费者组,下次程序重新消费时是可以从上次消费停止时的消费偏移开始继续消费的,这得益于kafka的_offset_主题保存的关于消费者组和topic偏移位置的具体偏移信息,那么flink应用中重启flink应用时,flink是从topic的什

    2024年02月16日
    浏览(48)
  • Idea本地跑flink任务时,总是重复消费kafka的数据(kafka->mysql)

    1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 Idea中执行任务时,没法看到JobManager的错误,以至于我以为是什么特殊的原因导致任务总是反复消费。在close方法中,增加日志,发现jdbc连接被关闭了。 重新消费,jdbc连接又启动了。 注意,在Flink的函数中,open和close方法

    2024年02月07日
    浏览(39)
  • 实战Flink Java api消费kafka实时数据落盘HDFS

    在Java api中,使用flink本地模式,消费kafka主题,并直接将数据存入hdfs中。 flink版本1.13 kafka版本0.8 hadoop版本3.1.4 为了完成 Flink 从 Kafka 消费数据并实时写入 HDFS 的需求,通常需要启动以下组件: 确保 Zookeeper 在运行,因为 Flink 的 Kafka Consumer 需要依赖 Zookeeper。 确保 Kafka Serve

    2024年01月24日
    浏览(50)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包