Spring-Kafka如何实现批量消费消息并且不丢失数据

这篇具有很好参考价值的文章主要介绍了Spring-Kafka如何实现批量消费消息并且不丢失数据。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Spring-Kafka如何实现批量消费消息并且不丢失数据

先给答案:

		// 批量消费配置: 1批量, 2手动提交
		factory.setBatchListener(true);
		factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);

		// 调大fetch的相关参数, 以便于提升吞吐量, 但会增大延时
		// 一次poll操作最大获取的记录数量
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); // max.poll.records, 缺省是500
		// 一次fetch操作最小的字节数, 如果低于这个字节数, 就会等待, 直到超时后才返回给消费者. 这里给100kB, 缺省是1B
		propsMap.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024 * 100); // fetch.min.bytes
		// 一次fetch操作的最大等待时间,“最大等待时间”与“最小字节”任何一个先满足了就立即返回给消费者
		// 需要注意:“最大等待时间”不能超过 session.timeout.ms 和 request.timeout.ms
		propsMap.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 10000); // fetch.max.wait.ms, 缺省是500

		// 在消费者方法中注入acknowledgment并在执行完业务逻辑后手动调用确认方法
		acknowledgment.acknowledge();

1、背景:

某个业务对象由多张表关联而成,要创建该对象需要向多张表插入数据,基于canal的监控就会有多次该对象的变更记录,而Kafka消费的时候也会多次处理同一个对象(虽然不同表,但是同一个对象的不同部分),原有的Kafka消费者是一次处理一条,这将造成重复对同一个对象的处理。其实只需要所有表插入完毕后,一次处理该对象即可。

2、现有技术架构:

mysql --> canal --> Kafka --> Spring-Kafka消费者 --> 下游接口

3、解决方案:

优化Spring-Kafka消费者,从单条处理改为多条处理,然后在消费者中合并相同的对象,从而达到一个对象只处理一次(最多两次)的目的。为什么有可能是两次,因为分批的时候很容易将同一个对象的多条消息分到上下两批,这样这个对象将会被处理两次;那为什么不会处理三次?其实也能够制造出来上次的情况,就是分批的大小如果很小就容易出现三次,甚至更多次。所以通常情况我们将分批大小设置得要比表数量多。

举个例子:一个对象的创建有10张表的插入消息,而分批大小设置成4,此时10条消息就被4这个批大小拆分成3批,每批处理一次该对象,那么该对象就会被处理3次。所以分批大小是一个重要的参数,其值的设定通常要大,但再大也不可避免被分成两批的情况。

4、实现步骤

Spring-kafka从1.1版本开始就支持了批量消费,需要在ContainerFactory中设置batchListener=true
同时设置消费者参数 max.poll.records 来控制一批的最大记录数量,该参数的缺省值为500

AbstractKafkaListenerContainerFactory类的源码如下:

	/**
	 * Set to true if this endpoint should create a batch listener.
	 * @param batchListener true for a batch listener.
	 * @since 1.1
	 */
	public void setBatchListener(Boolean batchListener) {
		this.batchListener = batchListener;
	}

可以从javaDoc的@since得知该功能从1.1版本就存在了,所以只要Spring-Kafka的版本高于1.1的都支持批量消费功能。这个参数是搭配@KafkaListener来使用的。单条消费的写法如下:

@KafkaListener(id = "", groupId = "", topics = {})
public void listen(ConsumerRecord<String, String> data) {
}

以上只能一次处理一条,并不能将多条消息统筹处理,所以使用以下的批量消费的写法:

@KafkaListener(id = "", groupId = "", topics = {}, containerFactory = "")
public void listen(List<ConsumerRecord<String, String>> datas) {
}

从以上代码可以看出,批量消费仅仅是将参数的ConsumerRecord类型改为List<ConsumerRecord>即可。
这样就行了吗?如果批次较大,对这一批数据的处理时间较长,就容易造成丢数据。场景如下:
1、开启自动提交offset,
2、提交offset的时间间隔为1s,
3、处理这一批数据耗时为2s。
还需要发生以下条件才能导致丢消息:
1、“自动提交offset的时间”到了且成功执行了offset的提交
2、此后几乎同时程序发生了严重的错误导致进程退出(注意此时消费者的代码逻辑并未执行完毕)
那么消息就会被丢失,因为Kafka收到了offset的提交,所以Kafka认为这一批消息已经处理成功了,但程序实际并未处理成功,等到下次启动程序,将从Kafka记录的offset开始消费,“记录的offset”就是发生异常退出前“提交的offset”。所以上次异常退出时刻的那一批消息就被丢失了,不会再被消费到。

举例:
假设消费者这一批消费到的消息offset编码列表为5,6,7。而自动提交offset时候会将7提交给Kafka,表示下一个消息将从8开始消费。但567这3条消息在消费者进程中还没来得及处理完毕就被意外终止了。等到人工处理错误重新启动程序,将从8开始消费,因为Kafka认为567已经处理过了,但实际567并没有成功处理,所以就会丢失567这一批的消息。

在进一步,如何防止消息丢失呢?答案是手动提交offset,同样Spring-Kafka已经提供了支持,其实Spring-Kafka只是对原生Kafka的包装,最核心的还是原生Kafka支持手动提交offset的能力。

上干货,Spring-Kafka中有一个类很有用:AcknowledgingMessageListener,这个类就是为了支持手动ack消息的,也即是手动提交offset,只是Spring将“手动提交offset”这个概念包装成了“确认消息”,里面有一个方法:

	/**
	 * Invoked with data from kafka.
	 * @param data the data to be processed.
	 * @param acknowledgment the acknowledgment.
	 */
	@Override
	void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);

这个类是设计成去实现它从而获取手动提交offset的能力,但我们还可以简化,结合之前的@KafkaListener的方法,我们将Acknowledgment acknowledgment参数放在@KafkaListener的方法中,Spring就能够将Acknowledgment对象传递进来,从而我们可以自己控制何时“确认消息”。当然仅有这步还不够,还需要告诉Spring-Kafka我需要手动提交offset,通过一个简单的设置即可:

    @Bean(name = "batch_and_manual_ack_ContainerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> batch_and_manual_ack_ContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        // 批量消费配置: 1批量, 2手动提交
        factory.setBatchListener(true);
        factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }

其中最重要的一句是:.setAckMode(AckMode.MANUAL_IMMEDIATE);,该配置的缺省值是AckMode.BATCH。可以从ContainerProperties类的源码找到缺省值:

	/**
	 * The ack mode to use when auto ack (in the configuration properties) is false.
	 * <ul>
	 * <li>RECORD: Ack after each record has been passed to the listener.</li>
	 * <li>BATCH: Ack after each batch of records received from the consumer has been
	 * passed to the listener</li>
	 * <li>TIME: Ack after this number of milliseconds; (should be greater than
	 * {@code #setPollTimeout(long) pollTimeout}.</li>
	 * <li>COUNT: Ack after at least this number of records have been received</li>
	 * <li>MANUAL: Listener is responsible for acking - use a
	 * {@link org.springframework.kafka.listener.AcknowledgingMessageListener}.
	 * </ul>
	 */
	private AbstractMessageListenerContainer.AckMode ackMode = AckMode.BATCH;

要想提高吞度量,须要设置一下几个参数:

  • max.poll.records
    一次poll操作最大获取的记录数量,缺省是500。该值越大,则吞吐量也越大,但要求消费者能够在不超时的情况下处理完所有的消息。

  • fetch.min.bytes
    一次fetch操作最小的字节数, 如果低于这个字节数, 就会等待, 直到超时后才返回给消费者. 缺省是1B

  • fetch.max.wait.ms
    一次fetch操作的最大等待时间,“最大等待时间”与“最小字节”任何一个先满足了就立即返回给消费者,缺省是500。
    需要注意:“最大等待时间”不能超过 session.timeout.ms 和 request.timeout.ms

5、所有代码:


    @Bean(name = "batch_and_manual_ack_ContainerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> batch_and_manual_ack_ContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(concurrency);
        factory.getContainerProperties().setPollTimeout(1500);
        // 批量消费配置: 1批量, 2手动提交
        factory.setBatchListener(true);
        factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }

    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<String, String>(consumerConfigs());
    }


    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<String, Object>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);

		// 调大fetch的相关参数, 以便于提升吞吐量, 但会增大延时
		// 一次poll操作最大获取的记录数量
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); // max.poll.records, 缺省是500
		// 一次fetch操作最小的字节数, 如果低于这个字节数, 就会等待, 直到超时后才返回给消费者. 这里给100kB, 缺省是1B
		propsMap.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024 * 100); // fetch.min.bytes
		// 一次fetch操作的最大等待时间,“最大等待时间”与“最小字节”任何一个先满足了就立即返回给消费者
		// 需要注意:“最大等待时间”不能超过 session.timeout.ms 和 request.timeout.ms
		propsMap.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 10000); // fetch.max.wait.ms, 缺省是500

        return propsMap;
    }
    
    @KafkaListener(groupId = "junit-test-group", containerFactory = "batch_and_manual_ack_ContainerFactory", topics = {"test"})
    public void test_batchConsume(List<ConsumerRecord<String, String>> datas, Acknowledgment acknowledgment) {
        System.out.println(new Date() + " datas = " + datas.size());
        System.out.println(new Date() + " collect = " + datas.stream().map(t -> t.offset()).collect(Collectors.toList()));
        // 最后一定要提交进度 (用于持久化进度到Kafka)
        acknowledgment.acknowledge();
    }

将以上代码放在某个Spring的类里,然后修改配置即可使用文章来源地址https://www.toymoban.com/news/detail-637139.html

到了这里,关于Spring-Kafka如何实现批量消费消息并且不丢失数据的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Kafka 如何保证消息的消费顺序

    我们在使用消息队列的过程中经常有业务场景需要严格保证消息的消费顺序,比如我们同时发了 2 个消息,这 2 个消息对应的操作分别对应的数据库操作是: 更改用户会员等级。 根据会员等级计算订单价格。 假如这两条消息的消费顺序不一样造成的最终结果就会截然不同。

    2024年02月13日
    浏览(54)
  • kafka 如何保证消息的顺序消费

    在Kafka分布式集群中,要保证消息的顺序消费,您可以采取以下措施: 分区策略 :Kafka的主题可以分为多个分区,每个分区内的消息是有序的。因此,首先要确保生产者将相关的消息发送到同一个分区。这可以通过生产者的分区策略来实现。默认情况下,Kafka会使用基于消息

    2024年02月06日
    浏览(39)
  • Kafka如何保证消息⼀定能被消费

    Kafka 通过多种机制来保证消息一定能被消费,从而实现数据的可靠性和持久性。 以下是一些常见的方法和策略来提高消息的可靠性: 复制机制: Kafka 使用了分区和副本的概念。每个分区可以有多个副本,分布在不同的 Broker 上。当消息写入到一个分区时,它会被复制到该分

    2024年02月12日
    浏览(47)
  • Kafka如何保证消息的消费顺序【全局有序、局部有序】、Kafka如何保证消息不被重复消费、Kafka为什么这么快?【重点】、Kafka常见问题汇总【史上最全】

    目录 Kafka消息生产 一个Topic对应一个Partition 一个Topic对应多个Partition Kafka消息的顺序性保证(Producer、Consumer) 全局有序 局部有序  max.in.flight.requests.per.connection参数详解 Kafka的多副本机制 Kafka的follower从leader同步数据的流程 Kafka的follower为什么不能用于消息消费 Kafka的多分区

    2024年04月11日
    浏览(54)
  • Kafka 如何保证消息消费的全局顺序性

    哈喽大家好,我是咸鱼 今天我们继续来讲一讲 Kafka 当有消息被生产出来的时候,如果没有指定分区或者指定 key ,那么消费会按照【轮询】的方式均匀地分配到所有可用分区中,但不一定按照分区顺序来分配 我们知道,在 Kafka 中消费者可以订阅一个或多个主题,并被分配一

    2024年02月05日
    浏览(47)
  • kafka如何保证消息不被重复消费

    (1)kafka有个offset的概念,当每个消息被写进去后,都有一个offset,代表他的序号,然后consumer消费该数据之后,隔一段时间,会把自己消费过的消息的offset提交一下,代表我已经消费过了。下次我要是重启,就会继续从上次消费到的offset来继续消费。但是当我们直接kill进程

    2024年02月11日
    浏览(51)
  • Kafka、RocketMQ、RabbitMQ如何保证消息的顺序消费?

    一、1个Topic(主题)只创建1个Partition (分区),这样生产者的所有数据都发送到了一个Partition (分区),保证了消息的消费顺序; 二、生产者在发送消息的时候指定要发送到哪个 Partition,这样同一个 Partition 的数据会被同一个消费者消费,从而保证了消息的消费顺序。 实现思路

    2024年02月09日
    浏览(48)
  • 消息队列-Kafka-消费方如何分区与分区重平衡

    消费分区 资料来源于网络 消费者订阅的入口:KafkaConsumer#subscribe 消费者消费的入口:KafkaConsumer#poll 处理流程: 对元数据重平衡处理:KafkaConsumer#updateAssignmentMetadataIfNeeded 协调器的拉取处理:onsumerCoordinator#poll 执行已完成的【消费进度】提交请求的回调函数:invokeCompletedOff

    2024年04月14日
    浏览(33)
  • spring-kafka之请求响应模式

            kafka是一款性能强劲的分布式流式处理软件,被广泛用于大数据应用场景。所以很多小伙伴对kafka肯定不会陌生,但是kafka的请求响应模式估计使用的却不一定很多。首先简单唠叨下什么是请求响应模式,这个类似于http请求一样发出请求能够在一个请求中返回结果

    2024年02月11日
    浏览(43)
  • 在Spring Boot微服务集成spring-kafka操作Kafka集群

    记录 :461 场景 :在Spring Boot微服务集成spring-kafka-2.8.2操作Kafka集群。使用KafkaTemplate操作Kafka集群的生产者Producer。使用@KafkaListener操作Kafka集群的消费者Consumer。 版本 :JDK 1.8,Spring Boot 2.6.3,kafka_2.12-2.8.0,spring-kafka-2.8.2。 Kafka集群安装 :https://blog.csdn.net/zhangbeizhen18/article/details

    2024年02月10日
    浏览(52)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包