Flink 中kafka broker缩容导致Task一直重启

这篇具有很好参考价值的文章主要介绍了Flink 中kafka broker缩容导致Task一直重启。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

背景

Flink版本 1.12.2
Kafka 客户端 2.4.1
在公司的Flink平台运行了一个读Kafka计算DAU的流程序,由于公司Kafka的缩容,直接导致了该程序一直在重启,重启了一个小时都还没恢复(具体的所容操作是下掉了四台kafka broker,而当时flink配置了12台kafka broker),当时具体的现场如下:

JobManaer上的日志如下:
2023-10-07 10:02:52.975 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: TableSourceScan(table=[[default_catalog, default_database, ubt_start, watermark=[-(LOCALTIMESTAMP, 1000:INTERVAL SECOND)]]]) (34/64) (e33d9ad0196a71e8eb551c181eb779b5) switched from RUNNING to FAILED on container_e08_1690538387235_2599_01_000010 @ task-xxxx-shanghai.emr.aliyuncs.com (dataPort=xxxx).
org.apache.flink.streaming.connectors.kafka.internals.Handover$ClosedException: null
        at org.apache.flink.streaming.connectors.kafka.internals.Handover.close(Handover.java:177)
        at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.cancel(KafkaFetcher.java:164)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:945)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.lambda$createAndStartDiscoveryLoop$2(FlinkKafkaConsumerBase.java:913)
        at java.lang.Thread.run(Thread.java:750)


对应的 TaskManager(task-xxxx-shanghai.emr.aliyuncs.com)上的日志如下:

2023-10-07 10:02:24.604 WARN  org.apache.kafka.clients.NetworkClient - [Consumer clientId=xxxx] Connection to node 46129 (sh-bs-b1-303-i14-kafka-129-46.ximalaya.local/192.168.129.46:9092) could not be established. Broker may not be available.


2023-10-07 10:02:52.939 WARN  org.apache.flink.runtime.taskmanager.Task - Source: TableSourceScan(t) (34/64)#0 (e33d9ad0196a71e8eb551c181eb779b5) switched from RUNNING to FAILED.
org.apache.flink.streaming.connectors.kafka.internals.Handover$ClosedException: null
        at org.apache.flink.streaming.connectors.kafka.internals.Handover.close(Handover.java:177)
        at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.cancel(KafkaFetcher.java:164)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:945)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.lambda$createAndStartDiscoveryLoop$2(FlinkKafkaConsumerBase.java:913)
        at java.lang.Thread.run(Thread.java:750)

2023-10-07 10:04:58.205 WARN  org.apache.kafka.clients.NetworkClient - [Consumer clientId=xxx, groupId=xxxx] Connection to node -4 (xxxx:909) could not be established. Broker may not be available.
2023-10-07 10:04:58.205 WARN  org.apache.kafka.clients.NetworkClient - [Consumer clientId=xxx, groupId=xxxx] Bootstrap broker sxxxx:909 (id: -4 rack: null) disconnected
2023-10-07 10:04:58.206 WARN  org.apache.kafka.clients.NetworkClient - [Consumer clientId=xxx, groupId=xxxxu] Connection to node -5 (xxxx:9092) could not be established. Broker may not be available.
2023-10-07 10:04:58.206 WARN  org.apache.kafka.clients.NetworkClient - [Consumer clientId=xxx, groupId=xxxxu] Bootstrap broker xxxx:9092 (id: -5 rack: null) disconnected


2023-10-07 10:08:15.541 WARN  org.apache.flink.runtime.taskmanager.Task - Source: TableSourceScan(xxx) switched from RUNNING to FAILED.
org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata

当时Flink中kafka source的相关配置如下:

scan.topic-partition-discovery.interval  300000
restart-strategy.type fixed-delay
restart-strategy.fixed-delay.attempts 50000000
jobmanager.execution.failover-strategy region

结论以及解决

目前在kafka 消费端有两个参数default.api.timeout.ms(默认60000),request.timeout.ms(默认30000),这两个参数来控制kakfa的客户端从服务端请求超时,也就是说每次请求的超时时间是30s(当然不是一次请求broker的超时时间,见后续其他),超时之后可以再重试,如果在60s内请求没有得到任何回应,则会报TimeOutException,具体的见如下分析,
我们在flink kafka connector中通过设置如下参数来解决:

`properties.default.api.timeout.ms` = '600000',
`properties.request.timeout.ms` = '5000',
// max.block.ms是设置kafka producer的超时
`properties.max.block.ms` = '600000',

分析

在Flink中对于Kafka的Connector的DynamicTableSourceFactoryKafkaDynamicTableFactory,这里我们只讨论kafka作为source的情况,
而该类的方法createDynamicTableSource最终会被调用,至于具体的调用链可以参考Apache Hudi初探(四)(与flink的结合)–Flink Sql中hudi的createDynamicTableSource/createDynamicTableSink/是怎么被调用–只不过把Sink改成Source就可以了,所以最终会到KafkaDynamicSource类:

@Override
    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
        final DeserializationSchema<RowData> keyDeserialization =
                createDeserialization(context, keyDecodingFormat, keyProjection, keyPrefix);

        final DeserializationSchema<RowData> valueDeserialization =
                createDeserialization(context, valueDecodingFormat, valueProjection, null);

        final TypeInformation<RowData> producedTypeInfo =
                context.createTypeInformation(producedDataType);

        final FlinkKafkaConsumer<RowData> kafkaConsumer =
                createKafkaConsumer(keyDeserialization, valueDeserialization, producedTypeInfo);

        return SourceFunctionProvider.of(kafkaConsumer, false);
    }

该类的getScanRuntimeProvider方法会被调用,所有kafka相关的操作都可以追溯到FlinkKafkaConsumer类(继承FlinkKafkaConsumerBase)中,对于该类重点的方法如下:

    @Override
    public final void initializeState(FunctionInitializationContext context) throws Exception {

        OperatorStateStore stateStore = context.getOperatorStateStore();

        this.unionOffsetStates =
                stateStore.getUnionListState(
                        new ListStateDescriptor<>(
                                OFFSETS_STATE_NAME,
                                createStateSerializer(getRuntimeContext().getExecutionConfig())));

       ... 
    }

   @Override
    public void open(Configuration configuration) throws Exception {
        // determine the offset commit mode
        this.offsetCommitMode =
                OffsetCommitModes.fromConfiguration(
                        getIsAutoCommitEnabled(),
                        enableCommitOnCheckpoints,
                        ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());

        // create the partition discoverer
        this.partitionDiscoverer =
                createPartitionDiscoverer(
                        topicsDescriptor,
                        getRuntimeContext().getIndexOfThisSubtask(),
                        getRuntimeContext().getNumberOfParallelSubtasks());
        this.partitionDiscoverer.open();

        subscribedPartitionsToStartOffsets = new HashMap<>();
        final List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions();
        if (restoredState != null) {
            ...
        } else {
            // use the partition discoverer to fetch the initial seed partitions,
            // and set their initial offsets depending on the startup mode.
            // for SPECIFIC_OFFSETS and TIMESTAMP modes, we set the specific offsets now;
            // for other modes (EARLIEST, LATEST, and GROUP_OFFSETS), the offset is lazily
            // determined
            // when the partition is actually read.
            switch (startupMode) {
                。。。
                default:
                    for (KafkaTopicPartition seedPartition : allPartitions) {
                        subscribedPartitionsToStartOffsets.put(
                                seedPartition, startupMode.getStateSentinel());
                    }
            }

            if (!subscribedPartitionsToStartOffsets.isEmpty()) {
                switch (startupMode) {
                    ...
                    case GROUP_OFFSETS:
                        LOG.info(
                                "Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {}",
                                getRuntimeContext().getIndexOfThisSubtask(),
                                subscribedPartitionsToStartOffsets.size(),
                                subscribedPartitionsToStartOffsets.keySet());
                }
            } else {
                LOG.info(
                        "Consumer subtask {} initially has no partitions to read from.",
                        getRuntimeContext().getIndexOfThisSubtask());
            }
        }

        this.deserializer.open(
                RuntimeContextInitializationContextAdapters.deserializationAdapter(
                        getRuntimeContext(), metricGroup -> metricGroup.addGroup("user")));
    }

    @Override
    public void run(SourceContext<T> sourceContext) throws Exception {
        if (subscribedPartitionsToStartOffsets == null) {
            throw new Exception("The partitions were not set for the consumer");
        }

        // initialize commit metrics and default offset callback method
        this.successfulCommits =
                this.getRuntimeContext()
                        .getMetricGroup()
                        .counter(COMMITS_SUCCEEDED_METRICS_COUNTER);
        this.failedCommits =
                this.getRuntimeContext().getMetricGroup().counter(COMMITS_FAILED_METRICS_COUNTER);
        final int subtaskIndex = this.getRuntimeContext().getIndexOfThisSubtask();

        this.offsetCommitCallback =
                new KafkaCommitCallback() {
                    @Override
                    public void onSuccess() {
                        successfulCommits.inc();
                    }

                    @Override
                    public void onException(Throwable cause) {
                        LOG.warn(
                                String.format(
                                        "Consumer subtask %d failed async Kafka commit.",
                                        subtaskIndex),
                                cause);
                        failedCommits.inc();
                    }
                };

        // mark the subtask as temporarily idle if there are no initial seed partitions;
        // once this subtask discovers some partitions and starts collecting records, the subtask's
        // status will automatically be triggered back to be active.
        if (subscribedPartitionsToStartOffsets.isEmpty()) {
            sourceContext.markAsTemporarilyIdle();
        }

        LOG.info(
                "Consumer subtask {} creating fetcher with offsets {}.",
                getRuntimeContext().getIndexOfThisSubtask(),
                subscribedPartitionsToStartOffsets);
        // from this point forward:
        //   - 'snapshotState' will draw offsets from the fetcher,
        //     instead of being built from `subscribedPartitionsToStartOffsets`
        //   - 'notifyCheckpointComplete' will start to do work (i.e. commit offsets to
        //     Kafka through the fetcher, if configured to do so)
        this.kafkaFetcher =
                createFetcher(
                        sourceContext,
                        subscribedPartitionsToStartOffsets,
                        watermarkStrategy,
                        (StreamingRuntimeContext) getRuntimeContext(),
                        offsetCommitMode,
                        getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),
                        useMetrics);

        if (!running) {
            return;
        }

        if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) {
            kafkaFetcher.runFetchLoop();
        } else {
            runWithPartitionDiscovery();
        }
    }

    @Override
    public final void snapshotState(FunctionSnapshotContext context) throws Exception {
        ...
                HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();

                if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
                    // the map cannot be asynchronously updated, because only one checkpoint call
                    // can happen
                    // on this function at a time: either snapshotState() or
                    // notifyCheckpointComplete()
                    pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
                }

                for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry :
                        currentOffsets.entrySet()) {
                    unionOffsetStates.add(
                            Tuple2.of(
                                    kafkaTopicPartitionLongEntry.getKey(),
                                    kafkaTopicPartitionLongEntry.getValue()));
                }
          ... 
        }
    }

    @Override
    public final void notifyCheckpointComplete(long checkpointId) throws Exception {
            ...
            fetcher.commitInternalOffsetsToKafka(offsets, offsetCommitCallback);
            ...
    }

主要是initializeStateopen,run,snapshotState,notifyCheckpointComplete这四个方法,下面带着问题逐一介绍一下:
注意:对于initializeStateopen方法的先后顺序,可以参考StreamTask类,其中如下的调用链:

invoke()
 ||
 \/
beforeInvoke()
 ||
 \/
operatorChain.initializeStateAndOpenOperators
 ||
 \/
FlinkKafkaConsumerBase.initializeState
 ||
 \/
FlinkKafkaConsumerBase.open

就可以知道 initializeState方法的调用是在open之前的

initializeState方法

这里做的事情就是从持久化的State中恢复kafkaTopicOffset信息,我们这里假设是第一次启动

open方法

  • offsetCommitMode
    offsetCommitMode = OffsetCommitModes.fromConfiguration 这里获取设置的kafka offset的提交模式,这里会综合enable.auto.commit的配置(默认是true),enableCommitOnCheckpoints默认是true,checkpointing设置为true(默认是false),综合以上得到的值为OffsetCommitMode.ON_CHECKPOINTS
  • partitionDiscoverer
    这里主要是进行kafka的topic的分区发现,主要路程是 partitionDiscoverer.discoverPartitions,这里的涉及的流程如下:
    AbstractPartitionDiscoverer.discoverPartitions
      ||
      \/
    AbstractPartitionDiscoverer.getAllPartitionsForTopics 
      ||
      \/
    KafkaPartitionDiscoverer.kafkaConsumer.partitionsFor
      ||
      \/
    KafkaConsumer.partitionsFor(topic, Duration.ofMillis(defaultApiTimeoutMs)) //这里的defaultApiTimeoutMs 来自于*default.api.timeout.ms*
      ||
      \/
    Fetcher.getTopicMetadata //这里面最后抛出 new TimeoutException("Timeout expired while fetching topic metadata");
      ||
      \/
    Fetcher.sendMetadataRequest => NetworkClient.leastLoadedNode //这里会根据某种策略选择配置的broker的节点
      ||
      \/
    client.poll(future, timer) => NetworkClient.poll => selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs)); // 这里的 *defaultRequestTimeoutMs* 来自配置*request.timeout.ms*
    
    
    综上所述,discoverPartitions做的就是根据某种策略选择配置的broker节点,对每个节点进行请求,request.timeout.ms超时后,再根据策略选择broker,直至总的时间达到了配置的default.api.timeout.ms,这里默认default.api.timeout.ms 为60秒,request.timeout.ms为30秒
  • subscribedPartitionsToStartOffsets
    根据startupMode模式,默认是StartupMode.GROUP_OFFSETS(默认从上次消费的offset开始消费),设置开启的kafka offset,这在kafkaFetcher中会用到

run方法

  • 设置一些指标successfulCommits/failedCommits
  • KafkaFetcher
    这里主要是从kafka获取数据以及如果有分区发现则循环进kafka的topic分区发现,这里会根据配置scan.topic-partition-discovery.interval默认配置为0,实际中设置的为300000,即5分钟。该主要的流程为在方法runWithPartitionDiscovery:
      private void runWithPartitionDiscovery() throws Exception {
          final AtomicReference<Exception> discoveryLoopErrorRef = new AtomicReference<>();
          createAndStartDiscoveryLoop(discoveryLoopErrorRef);
    
          kafkaFetcher.runFetchLoop();
    
          // make sure that the partition discoverer is waked up so that
          // the discoveryLoopThread exits
          partitionDiscoverer.wakeup();
          joinDiscoveryLoopThread();
    
          // rethrow any fetcher errors
          final Exception discoveryLoopError = discoveryLoopErrorRef.get();
          if (discoveryLoopError != null) {
              throw new RuntimeException(discoveryLoopError);
          }
      }
    
    
    • createAndStartDiscoveryLoop 这个会启动单个线程以while sleep方式实现以scan.topic-partition-discovery.interval为间隔来轮询进行Kafka的分区发现,注意这里会吞没Execption,并不会抛出异常

       private void createAndStartDiscoveryLoop(AtomicReference<Exception> discoveryLoopErrorRef) {
         discoveryLoopThread =
                 new Thread(
                         ...
                         while (running) {
                           ...
                                     try {
                                         discoveredPartitions =
                                                 partitionDiscoverer.discoverPartitions();
                                     } catch (AbstractPartitionDiscoverer.WakeupException
                                             | AbstractPartitionDiscoverer.ClosedException e) {
                                       
                                         break;
                                     }
                                     if (running && !discoveredPartitions.isEmpty()) {
                                         kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);
                                     }
      
                                     if (running && discoveryIntervalMillis != 0) {
                                         try {
                                             Thread.sleep(discoveryIntervalMillis);
                                         } catch (InterruptedException iex) {
                                             break;
                                         }
                                     }
                                 }
                             } catch (Exception e) {
                                 discoveryLoopErrorRef.set(e);
                             } finally {
                                 // calling cancel will also let the fetcher loop escape
                                 // (if not running, cancel() was already called)
                                 if (running) {
                                     cancel();
                                 }
                             }
                         },
                         "Kafka Partition Discovery for "
                                 + getRuntimeContext().getTaskNameWithSubtasks());
      
         discoveryLoopThread.start();
      }
      

      这里的kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);subscribedPartitionStates变量会把发现分区信息保存起来,这在kafkaFetcher.runFetchLoop中会设置已经提交的offset信息,并且会在snapshotState会用到

    • kafkaFetcher.runFetchLoop 这里会从kafka拉取数据,并设置kafka的offset,具体的流程如下:

       runFetchLoop 
          ||
          \/
        subscribedPartitionStates 这里会获取*subscribedPartitionStates*变量
          ||
          \/
        partitionConsumerRecordsHandler
          ||
          \/
        emitRecordsWithTimestamps
          ||
          \/
        emitRecordsWithTimestamps
          ||
          \/
        partitionState.setOffset(offset);
      

      这里的offset就是从消费的kafka记录中获取的

snapshotState方法

这里会对subscribedPartitionStates中的信息进行处理,主要是加到pendingOffsetsToCommit变量中

  • offsetCommitMode
    这里上面说到是OffsetCommitMode.ON_CHECKPOINTS,如果是ON_CHECKPOINTS,则会从fetcher.snapshotCurrentState获取subscribedPartitionStates
    并加到pendingOffsetsToCommit,并持久化到unionOffsetStates中,这实际的kafka offset commit操作在notifyCheckpointComplete中,

notifyCheckpointComplete方法

获取到要提交的kafka offset信息,并持久化保存kafka中

其他

之前说的request.timeout.ms 并不是一次请求,而是从目前的实现来看,kafka client 会对访问失败的broker,一直访问直到超时(默认30秒):文章来源地址https://www.toymoban.com/news/detail-725267.html

[2023-10-13 13:42:18] 7673 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: -11 rack: null)
[2023-10-13 13:42:18] 7723 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: -11 rack: null)
[2023-10-13 13:42:18] 7773 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: -11 rack: null)
[2023-10-13 13:42:18] 7823 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: -11 rack: null)
[2023-10-13 13:42:18] 7874 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: -11 rack: null)
[2023-10-13 13:42:18] 7924 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: -11 rack: null)
[2023-10-13 13:42:18] 7974 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: -11 rack: null)
[2023-10-13 13:42:18] 8025 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: -11 rack: null)
[2023-10-13 13:42:18] 8075 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: -11 rack: null)
[2023-10-13 13:42:18] 8125 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: -11 rack: null)
[2023-10-13 13:42:19] 8175 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: -11 rack: null)
[2023-10-13 13:42:19] 8226 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: -11 rack: null)
[2023-10-13 13:42:19] 8276 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: -11 rack: null)
[2023-10-13 13:42:19] 8326 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: -11 rack: null)
[2023-10-13 13:42:19] 8376 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: -11 rack: null)
[2023-10-13 13:42:19] 8426 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: -11 rack: null)
[2023-10-13 13:42:19] 8477 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: -11 rack: null)
[2023-10-13 13:42:19] 8527 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: -11 rack: null)
[2023-10-13 13:42:19] 8577 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: -11 rack: null)
[2023-10-13 13:42:19] 8627 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: -11 rack: null)
[2023-10-13 13:42:19] 8678 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: -11 rack: null)

参考

  • open 和 initailizeState的初始化顺序
  • A single failing Kafka broker may cause jobs to fail indefinitely with TimeoutException: Timeout expired while fetching topic metadata

到了这里,关于Flink 中kafka broker缩容导致Task一直重启的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink源码-Task执行

    上一节我们分析到了Execution的生成,然后调用taskManagerGateway.submitTask方法提交task,提交的时候会将executionVertex封装成TaskDeploymentDescriptor,task的提交与执行涉及到了flink多个组件的配合,之前没有详细讲过,可能有的小伙伴有点不太清楚,这里我们花点时间介绍一下。 1.JobManager

    2024年02月03日
    浏览(37)
  • 深入理解 Flink(八)Flink Task 部署初始化和启动详解

    核心入口: 部署 Task 链条:JobMaster -- DefaultScheduler -- SchedulingStrategy -- ExecutionVertex -- Execution -- RPC请求 -- TaskExecutor JobMaster 向 TaskExecutor 发送 submitTask() 的 RPC 请求,用来部署 StreamTask 运行。TaskExecutor 接收到 JobMaster 的部署 Task 运行的 RPC 请求的时候,就封装了一个 Task 抽象,然

    2024年01月17日
    浏览(75)
  • Flink Task退出流程与Failover机制

    Task.doRun() 引导Task初始化并执行其相关代码的核心方法, 构造并实例化Task的可执行对象: AbstractInvokable invokable。 调用 AbstractInvokable.invoke() 开始启动Task包含的计算逻辑。 当AbstractInvokable.invoke()执行退出后,根据退出类型执行相应操作: 正常执行完毕退出:输出ResultPartition缓冲

    2024年02月22日
    浏览(41)
  • 如何处理 Flink 作业频繁重启问题?

    Flink 实现了多种重启策略 固定延迟重启策略(Fixed Delay Restart Strategy) 故障率重启策略(Failure Rate Restart Strategy) 没有重启策略(No Restart Strategy) Fallback重启策略(Fallback Restart Strategy) Flink支持不同的重启策略,以在故障发生时控制作业如何重启 默认的重启策略:如果没有

    2024年02月10日
    浏览(39)
  • Flink checkpoint操作流程详解与报错调试方法汇总,增量checkpoint原理及版本更新变化,作业恢复和扩缩容原理与优化

    本文主要参考官方社区给出的checkpoint出错类型和种类,以及查找报错的方法。 主要分为两种 Checkpoint Decline 与 Checkpint Expire 两种类型 下面分开讨论 从业务上来讲,Checkpoint 失败可能有较多的影响。 Flink 恢复时间长,会导致服务可用率降低。 非幂等或非事务场景,导致大量业

    2024年02月22日
    浏览(45)
  • (增加细粒度资源管理)深入理解flink的task slot相关概念

    之前对flink的task slot的理解太浅了,重新捋一下相关知识点 我们知道,flink中每个TaskManager都是一个 JVM 进程,可以在单独的线程中执行一个或多个 subtask(线程)。 但是TaskManager 的计算资源是有限的,并不是所有任务都可以放在同一个 TaskManager 上并行执行。并行的任务越多

    2024年03月11日
    浏览(41)
  • flink 最后一个窗口一直没有新数据,窗口不关闭问题

    窗口类型:滚动窗口 代码: 代码部分逻辑说明 若设置了自动生成watermark 参数,根据打印日志,设置对应的时间(多久没新数据写入,触发窗口计算) env.getConfig().setAutoWatermarkInterval(5000); 使用自定义的watermark: watermark 周期生成()的疑问: 1、默认200ms,会连续生成4次后,

    2024年01月18日
    浏览(39)
  • Flink 启动就报错,但exception没提示。其中一个task failure 该怎么办?

    最近我在生产又遇到一个问题,就是消费着一段时间之后,忽然就不再消费了,但也不报错。观察了几次,我发现时间基本是停留在上下班高峰期数据量最大的时候。我主观猜测可能是同时间进来的数据过多,处理不来导致的。但这个问题我还没来的及思考怎么处理,因此我

    2024年02月16日
    浏览(55)
  • Flink的checkpoint遇到过什么问题,什么原因导致的

    checkpoint 失败一般都和反压相结合。导致 checkpoint 失败的原因有两个: 1. 数据流动缓慢,checckpoint 执行时间过长 。 我们知道, Flink checkpoint 机制是基于 barrier 的, 在数据处理过程中, barrier 也需要像普通数据一样,在 buffer 中排队,等待被处理。当 buffer 较大或者数据处理较慢

    2024年01月19日
    浏览(39)
  • flink正常消费kafka数据,flink没有做checkpoint,kafka位点没有提交

    1、背景 flink消费kafka数据,多并发,实现双流join 2、现象 (1)flink任务消费kafka数据,其中数据正常消费,kafka显示消息堆积,位点没有提交,并且flink任务没有做checkpoint (2)其中一个流的subtask显示finished (3)无背压 3、问题原因 (1)其中一个topic分区为1 (2)配置的并行

    2024年02月13日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包