flink的异常concurrent.TimeoutException: Heartbeat of TaskManager with id的解决

这篇具有很好参考价值的文章主要介绍了flink的异常concurrent.TimeoutException: Heartbeat of TaskManager with id的解决。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

背景

在使用flink进行集成测试时,我们会使用MiniClusterWithClientResource类,但是当我们断点导致在某个方法执行的时间比较长时,会有错误发生,那么该如何解决这个错误呢?

处理concurrent.TimeoutException: Heartbeat of TaskManager with id错误

其实关键的配置是heartbeat.timeout,这个错误是JobManager抛出的,意思是和某个TaskManager的心跳中断超过了指定的时间,我们把这个参数配置到MiniClusterWithClientResource类中就可以了,代码如下所示:文章来源地址https://www.toymoban.com/news/detail-777808.html

public class FlinkIntegrationTest {

    public static final Configuration config = Configuration.fromMap(new HashMap<String, String>() {
        {
            put("heartbeat.timeout", "300000");
        }
    });

    @ClassRule
    public static MiniClusterWithClientResource flinkCluster =
            new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(config)
                    .setNumberSlotsPerTaskManager(1).setNumberTaskManagers(3).build());

    @Test
    public void testStateFlatMap() throws Exception {
        StatefulFlatMap statefulFlatMap = new StatefulFlatMap();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // configure your test environment
        env.setParallelism(2);

        // values are collected in a static variable
        CollectSink.values.clear();

        // create a stream of custom elements and apply transformations
        env.fromElements("world", "hi").keyBy(e -> "1").flatMap(statefulFlatMap).addSink(new CollectSink());

        // execute
        env.execute();

        // verify your results
        assertTrue(CollectSink.values.containsAll(Lists.newArrayList("hello world", "hello hi world")));
    }

    @Test
    public void testStateFlatMap1() throws Exception {
        StatefulFlatMap statefulFlatMap = new StatefulFlatMap();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // configure your test environment
        env.setParallelism(2);

        // values are collected in a static variable
        CollectSink.values.clear();

        // create a stream of custom elements and apply transformations
        env.fromElements("world", "hi", "world").keyBy(e -> e).flatMap(statefulFlatMap).addSink(new CollectSink());

        // execute
        env.execute();

        // verify your results
        assertTrue(CollectSink.values.containsAll(Lists.newArrayList("hello world", "hello hi", "hello world world")));
    }



    // create a testing sink
    private static class CollectSink implements SinkFunction<String> {

        // must be static
        public static final List<String> values = Collections.synchronizedList(new ArrayList<>());

        @Override
        public void invoke(String value, Context context) throws Exception {
            values.add(value);
        }
    }


}

到了这里,关于flink的异常concurrent.TimeoutException: Heartbeat of TaskManager with id的解决的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink|《Flink 官方文档 - 部署 - 内存配置 - 配置 TaskManager 内存》学习笔记

    学习文档:Flink|《Flink 官方文档 - 部署 - 内存配置 - 配置 TaskManager 内存》学习笔记 学习笔记如下: Flink JVM 进程的进程总内存(Total Process Memory)包含了由 Flink 应用使用的内存(Flink 总内存)以及由运行 Flink 的 JVM 使用的内存。其中,Flink 总内存(Total Flink Memory)包括 JV

    2024年03月15日
    浏览(44)
  • 关于Flink,TaskManager日志问题的一个记录

    疑问:不知道大家有和我一样,开发完的flink代码推送到flink集群上执行的时候log.info(xxxx)打印的日志不会打印到task-manager节点上去(在IDEA上执行可以打印日志到控制台上),为此一直在困扰了,经过一系列的尝试,终于在java和scala中实现log.info(xxx)打印到task-manager节点上。 java代

    2024年02月16日
    浏览(37)
  • flink任务内存调优,TaskManager、JobManager内存配置

            Flink是基于java的JVM运行,拥有高效的数据处理能力,但是考虑到用户在 Flink 上运行的应用的多样性,尽管flink框架已经为所有配置项提供合理的默认值,仍无法满足所有情况下的需求。 为了给用户生产提供最大化的价值, Flink 允许用户在整体上以及细粒度上对集

    2024年02月06日
    浏览(47)
  • 【大数据】Flink 内存管理(四):TaskManager 内存分配(实战篇)

    《 Flink 内存管理 》系列(已完结),共包含以下 4 篇文章: Flink 内存管理(一):设置 Flink 进程内存 Flink 内存管理(二):JobManager 内存分配(含实际计算案例) Flink 内存管理(三):TaskManager 内存分配(理论篇) Flink 内存管理(四):TaskManager 内存分配(实战篇) 😊

    2024年03月13日
    浏览(51)
  • C# SerialPort串口ReadTimeout 超时异常。“System.TimeoutException”

    一、简介 系统采用之前的系统: 相关的链接为 https://blog.csdn.net/u011854789/article/details/51895014 https://blog.csdn.net/weixinhum/article/details/53684151 http://www.cnblogs.com/Traveller-Lee/p/6940221.html(主要参考) (一)WPF工程做上位机与彩屏(或单片机)进行串口通信、解决彩屏(或单片机)只能

    2024年02月09日
    浏览(43)
  • docker 部署flink和遇到webui 下taskmanager的无stdout 打印解决

    这个问题的解决办法我网上参考了很多教程,在下面这个教程找到了一点思路 https://blog.csdn.net/Allocator/article/details/106858679 我跟他的情况稍有不一样: 1. 他是log和stdout都没有显示,我是有log显示但是没有stdout显示 2. flink 的版本不一样,所以里面的一些脚本不太一致,按照他的

    2024年01月24日
    浏览(37)
  • flink连接kafka报:org.apache.kafka.common.errors.TimeoutException

    测试flink1.12.7 连接kafka: 执行报错如下: 经排除,找到文章:flink连接kafka报:org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic_未来的资深Java架构师的博客-CSDN博客 因为工程中log4j默认等级是error,所以,先配置resource/log4j.properties,日志等级改成info: 再运行

    2024年02月06日
    浏览(48)
  • cdh6.3.2 Flink On Yarn taskmanager任务分配倾斜问题的解决办法

    Flink On Yarn任务启动 CDH:6.3.2 Flink:1.13.2 Hadoop:3.0.0 在使用FLink on Yarn调度过程中,发现taskmanager总是分配在集中的几个节点上,集群有11个节点,但每个任务启动,只用到两三个节点,导致这几台服务器负载过高,其他节点又比较空闲。 1、yarn.scheduler.fair.assignmultiple 2、yarn.s

    2024年02月12日
    浏览(40)
  • java.util.concurrent.Executionexception 异常

    今天运行时发生了如下报错。自己捣鼓半天也没发现问题出在哪儿,感谢大佬的帮助,记录下来防止再犯。。 caused by org.apache.flink.client.program.programInvocationException: Job failed。程序调用异常。网上找了很多解决方法,都没有能够解决这个问题。 直到在报错中发现了这一行: C

    2024年02月19日
    浏览(38)
  • flink连接kafka报:org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic

    1、在网上搜了半天,大多数都是说需要改kafka的server.properties配置,指明0.0.0.0,外网才能访问( 其实是不对的,压根就不需要改,kafka安装好里面参数是啥就是啥 )。 2、还有说程序中引入的scala依赖需要跟Linux上运行的kafka内嵌的scala版本一致( 这个确实需要对应 ),但是改

    2024年02月12日
    浏览(57)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包