背景
在使用flink进行集成测试时,我们会使用MiniClusterWithClientResource类,但是当我们断点导致在某个方法执行的时间比较长时,会有错误发生,那么该如何解决这个错误呢?文章来源:https://www.toymoban.com/news/detail-777808.html
处理concurrent.TimeoutException: Heartbeat of TaskManager with id错误
其实关键的配置是heartbeat.timeout,这个错误是JobManager抛出的,意思是和某个TaskManager的心跳中断超过了指定的时间,我们把这个参数配置到MiniClusterWithClientResource类中就可以了,代码如下所示:文章来源地址https://www.toymoban.com/news/detail-777808.html
public class FlinkIntegrationTest {
public static final Configuration config = Configuration.fromMap(new HashMap<String, String>() {
{
put("heartbeat.timeout", "300000");
}
});
@ClassRule
public static MiniClusterWithClientResource flinkCluster =
new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(config)
.setNumberSlotsPerTaskManager(1).setNumberTaskManagers(3).build());
@Test
public void testStateFlatMap() throws Exception {
StatefulFlatMap statefulFlatMap = new StatefulFlatMap();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// configure your test environment
env.setParallelism(2);
// values are collected in a static variable
CollectSink.values.clear();
// create a stream of custom elements and apply transformations
env.fromElements("world", "hi").keyBy(e -> "1").flatMap(statefulFlatMap).addSink(new CollectSink());
// execute
env.execute();
// verify your results
assertTrue(CollectSink.values.containsAll(Lists.newArrayList("hello world", "hello hi world")));
}
@Test
public void testStateFlatMap1() throws Exception {
StatefulFlatMap statefulFlatMap = new StatefulFlatMap();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// configure your test environment
env.setParallelism(2);
// values are collected in a static variable
CollectSink.values.clear();
// create a stream of custom elements and apply transformations
env.fromElements("world", "hi", "world").keyBy(e -> e).flatMap(statefulFlatMap).addSink(new CollectSink());
// execute
env.execute();
// verify your results
assertTrue(CollectSink.values.containsAll(Lists.newArrayList("hello world", "hello hi", "hello world world")));
}
// create a testing sink
private static class CollectSink implements SinkFunction<String> {
// must be static
public static final List<String> values = Collections.synchronizedList(new ArrayList<>());
@Override
public void invoke(String value, Context context) throws Exception {
values.add(value);
}
}
}
到了这里,关于flink的异常concurrent.TimeoutException: Heartbeat of TaskManager with id的解决的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!