1、背景
flink消费kafka数据,多并发,实现双流join
2、现象
(1)flink任务消费kafka数据,其中数据正常消费,kafka显示消息堆积,位点没有提交,并且flink任务没有做checkpoint
(2)其中一个流的subtask显示finished
(3)无背压
3、问题原因
(1)其中一个topic分区为1
(2)配置的并行度大于kafka的partition数,导致有部分subtask空闲,然后状态变为finished
在CheckpointCoordinator类的triggerCheckpoint方法中有如下代码段
// check if all tasks that we need to trigger are running.
// if not, abort the checkpoint
Execution[] executions = new Execution[tasksToTrigger.length];
for (int i = 0; i < tasksToTrigger.length; i++) {
Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
if (ee == null) {
LOG.info("Checkpoint triggering task {} of job {} is not being executed at the moment. Aborting checkpoint.",
tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
job);
throw new CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
} else if (ee.getState() == ExecutionState.RUNNING) {
executions[i] = ee;
} else {
LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.",
tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
job,
ExecutionState.RUNNING,
ee.getState());
throw new CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
}
ee.getState() == ExecutionState.RUNNING判断execution的状态是否为running,否则不做checkpoint
4、解决办法
将读取只有一个partition的topic的source任务并发改成1
5、结论
在消费kafka的数据时,source的并发度不能超过kafka的partition数,否则部分source无数据消费,导致finished,可以小于partition,但是部分subtask就会消费多个partition的数据,导致吞吐达不到最大,理想状态是source并发度等于partition数。
问题结论:在消费kafka的数据时,source的并发度不能超过kafka的partition数,可以小于partition,但是部分subtask就会消费多个partition的数据,导致吞吐达不到最大,理想状态是source并发度等于partition数。文章来源:https://www.toymoban.com/news/detail-536826.html
与下面的同学遇到了一样的问题
链接:https://www.jianshu.com/p/9110ff473280文章来源地址https://www.toymoban.com/news/detail-536826.html
到了这里,关于flink正常消费kafka数据,flink没有做checkpoint,kafka位点没有提交的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!