Kafka重复消费以及消费线程安全关闭的解决方案

这篇具有很好参考价值的文章主要介绍了Kafka重复消费以及消费线程安全关闭的解决方案。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

背景和原因分析

Kafka消费程序每次重启都会出现重复消费的情况,考虑是在kill掉程序的时候,有部分消费完的数据没有提交offsect。

props.setProperty("enable.auto.commit", "true");

此处表明自动提交,即延迟提交(poll的时候会根据配置的自动提交时间间隔去进行检测并提交)。当kill掉程序的时候,可能消费完的数据还没有到达提交的时间点程序就被kill掉了。

重复消费解决方案:

关闭自动提交,采用异步提交+同步提交的方式来提交offsect。

// 关闭自动提交
props.setProperty("enable.auto.commit", "false");
// 消费逻辑
try {
    while (true) {
        ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, byte[]> record : records) {
            // 具体业务逻辑
        }
        consumer.commitAsync();
    }
    System.out.println("while end.");
} catch (Exception e) {
    System.err.println("consume error..." + e.getMessage());
} finally {
    try {
        consumer.commitSync();
        System.out.println("commit sync suc.");
    } catch (Exception e) {
        System.err.println("commit sync error." + e.getMessage());
    } finally {
        consumer.close();
        System.out.println("close.");
    }
}

这样还不够,当kill掉程序的时候,会发现并没有走到finally中。说明线程非正常停止。

线程安全关闭解决方案:

1.使用线程池来运行线程
2.在实例销毁前使用结束标志手动停止线程
3.使用CountDownLatch等待线程停止

第一步:定义线程池

@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
    int cpuCoreNum = Runtime.getRuntime().availableProcessors();
    ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
    threadPoolTaskExecutor.setCorePoolSize(cpuCoreNum);
    threadPoolTaskExecutor.setMaxPoolSize(cpuCoreNum * 2);
    threadPoolTaskExecutor.setQueueCapacity(2000);
    threadPoolTaskExecutor.setKeepAliveSeconds(60);
    threadPoolTaskExecutor.setThreadNamePrefix("global_thread_pool_task_executor");
    threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
    threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
    threadPoolTaskExecutor.setAwaitTerminationSeconds(10);// 确保该值是线程池中各个线程阻塞的最大时长
    threadPoolTaskExecutor.initialize();
    return threadPoolTaskExecutor;
}

此处两个配置参数至关重要
setWaitForTasksToCompleteOnShutdown(true)表示等待正在进行和排队的任务完成。
threadPoolTaskExecutor.setAwaitTerminationSeconds(10)虽然我们已经配置为等待正在进行和排队的任务完成,但Spring仍然会继续关闭容器的其余部分。这可能会释放任务执行器所需的资源,并导致任务失败。配置这个最大等待时间可以确保在指定的时间段内,容器级别的关闭过程将被阻止。
等待时间设置多少具体看线程池中业务线程最大耗时来定。
如果不停止线程,就会超过线程池的等待时间。通过以下WARN日志可以发现,在停止线程池的时候仍然存在业务线程没有停掉的情况,所以还需要定义一个标志来手动停止线程。

WARN 11472 --- [extShutdownHook] o.s.s.concurrent.ThreadPoolTaskExecutor  : Timed out while waiting for executor 'threadPoolTaskExecutor' to terminate

第二步:定义结束标志,并在对象销毁前停止线程

// 线程中断标志
public volatile boolean running = true;
while (running) {
	...
}

然后再实现DisposableBean接口中的destroy方法,在实例销毁之前将running置为false停止线程

@Override
public void destroy() throws Exception {
    this.running = false; // 循环并非立即停止,而是等到当前执行的循环体执行结束才会停止,所以这个地方的等待时间需要与线程池中的setAwaitTerminationSeconds参数相对应
}

当destroy方法运行结束,系统就会销毁掉当前实例,接着就会开始销毁当前实例的依赖(没有被其它实例所引用的话),而此时需要注意的是线程其实并没有运行结束。所以问题出现了:线程还在运行中,而运行所需要的资源(比如Redis连接资源)被提前关闭了,就会导致异常出现。所以在将running置为false之后还需要使用CountDwonLatch等待线程结束,再接着销毁其它依赖。
此处省略第三步,直接上完整的样例代码:文章来源地址https://www.toymoban.com/news/detail-545707.html

@Component
public class ConsumerClosedSafely implements CommandLineRunner, DisposableBean {

    private volatile boolean running = true;
    private final CountDownLatch latch = new CountDownLatch(1);
    private final String[] topics = new String[]{"test"};

    @Autowired
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;

    public void consume() throws Exception{
        Properties props = new Properties();
        //TODO 其它属性
        props.setProperty("enable.auto.commit", "false");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topics));
        // 消费逻辑
        try {
            while (running) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    //TODO 具体业务逻辑
                }
                consumer.commitAsync();
            }
            System.out.println("while end.");
        } catch (Exception e) {
            System.err.println("consume error..." + e.getMessage());
        } finally {
            try {
                consumer.commitSync();
                System.out.println("commit sync suc.");
            } catch (Exception e) {
                System.err.println("commit sync error." + e.getMessage());
            } finally {
                consumer.close();
                System.out.println("close.");
                // 计数器减一
                latch.countDown();
                System.out.println("latch count down .");
            }
        }
    }

    @Override
    public void run(String... args) throws Exception {
        Runnable r = ()->{
            try {
                consume();
            } catch (Exception e) {
                System.exit(1);
            }
        };
        threadPoolTaskExecutor.execute(r);
    }

    @Override
    public void destroy() throws Exception {
        // 终止循环
        this.running= false;
        // 等待运行结束
        latch.await();
    }
}

到了这里,关于Kafka重复消费以及消费线程安全关闭的解决方案的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • kafka如何避免消息重复消费

    Kafka 避免消息重复消费通常依赖于以下策略和机制: Kafka使用Consumer Group ID来跟踪每个消费者所读取的消息。确保每个消费者都具有唯一的Group ID。如果多个消费者属于同一个Group ID,那么它们将共享消息,但每个分区的消息只能由一个消费者处理。 Kafka会记录每个消费者组消

    2024年01月15日
    浏览(38)
  • 记一次Kafka重复消费解决过程

            起因:车联网项目开发,车辆发生故障需要给三个系统推送消息,故障上报较为频繁,所以为了不阻塞主流程,采用了使用kafka。消费方负责推送并保存推送记录,但在一次压测中发现,实际只发生了10次故障,但是推送记录却有30多条。         问题排查,发现

    2024年02月13日
    浏览(55)
  • 解决Kafka新消费者组导致重复消费的问题

             问题描述 :在使用Kafka时,当我们向新的消费者组中添加消费者时,可能会遇到重复消费的问题。本文将介绍一些解决这个问题的方法,帮助开发者更好地处理Kafka中的消费者组和消费偏移量。         Kafka是一个强大的分布式消息队列系统,但在使用过程中

    2024年02月07日
    浏览(44)
  • Kafka入门,漏消费和重复消费, 消费者事务,数据积压(二十四)

    重复消费:已经消费了数据,但是offset没提交。 漏消费:先提交offset后消费,有可能会造成数据得漏消费 如果向完成consumer端得进准一次性消费,那么需要Kafka消费端将消费过程和提交offset过程做原子绑定。此时我们需要将kafka的offset保存到支持事务的自定义介质(比如MySQ

    2024年02月15日
    浏览(39)
  • Kafka学习---4、消费者(分区消费、分区平衡策略、offset、漏消费和重复消费)

    1.1 Kafka消费方式 1、pull(拉)模式:consumer采用从broker中主动拉取数据。 2、push(推)模式:Kafka没有采用这种方式。因为broker决定消息发生速率,很难适应所有消费者的消费速率。例如推送的速度是50M/s,Consumer1、Consumer2就来不及处理消息。 pull模式不足之处是如果Kafka没有数

    2024年02月16日
    浏览(44)
  • 【RabbitMQ | 第六篇】消息重复消费问题及解决方案

    什么是 消息重复消费 ?首先我们来看一下消息的传输流程。消息生产者–MQ–消息消费者;消息生产者发送消息到MQ服务器,MQ服务器存储消息,消息消费者监听MQ的消息,发现有消息就消费消息。 所以消息重复也就出现在 两个阶段 1 :生产者多发送了消息给MQ; 2 :MQ的一条

    2024年04月26日
    浏览(47)
  • springboot 开启和关闭kafka消费

    配置自定义容器工厂 在消费监听器上使用工厂,并设置id 这样,启动项目后,就不会自动消费了。

    2024年02月03日
    浏览(45)
  • kafka如何保证消息不被重复消费

    (1)kafka有个offset的概念,当每个消息被写进去后,都有一个offset,代表他的序号,然后consumer消费该数据之后,隔一段时间,会把自己消费过的消息的offset提交一下,代表我已经消费过了。下次我要是重启,就会继续从上次消费到的offset来继续消费。但是当我们直接kill进程

    2024年02月11日
    浏览(48)
  • 【JAVA】生产环境kafka重复消费问题记录

    业务系统每周都有定时任务在跑,由于是大任务因此采用分而治之思想将其拆分为多个分片小任务采用 kafka异步队列消费 的形式来减少服务器压力,每个小任务都会调用后台的c++算法,调用完成之后便会回写数据库的成功次数。今天观测到定时任务的分片小任务存在被重复消

    2024年04月12日
    浏览(41)
  • Kafka 入门到起飞 - Kafka怎么做到保障消息不会重复消费的? 消费者组是什么?

    消费者 : 1、订阅Topic(主题) 2、从订阅的Topic消费(pull)消息, 3、将消费消息的offset(偏移量)保存在Kafka内置的一Topic名字是_consumer_offsets的主题中,在Kafka的logs文件下能看到这👟文件,存放的是消息的偏移量数据 消费者组 : 1、订阅同一个Topic的消费者可以加入到一个

    2024年02月15日
    浏览(39)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包