Commit cannot be completed since the group has already rebalanced and assign

这篇具有很好参考价值的文章主要介绍了Commit cannot be completed since the group has already rebalanced and assign。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

报错信息

Commit cannot be completed since the group has already rebalanced and assigned the partitions

如何理解

这里是说提交commit失败, 因为这个组已经重新分配了

产生原因

正常情况下, kafka会有一个配置用于设置一条消息的过期时间, 在规定时间内, 如果消费者提交了消费完成的信息, 那么就可以正常的分配下一条记录给消费者, 并且将当前记录的状态记为"已消费"状态, 对消息队列做一个标识, 避免重复消费

如何解决

kafka中配置的规定返回消息时间, 默认是300s, 也就是5分钟, 但是有一些业务逻辑处理起来比较复杂, 数据量又比较庞大, 那么5分钟是肯定处理不完的, 比如导入一个5G的文件, 然后逐条插入数据库, 这就需要消耗很长时间, 所以需要设置一下kafka的最大间隔时间
在application-dev.yml文件中配置如下

Commit cannot be completed since the group has already rebalanced and assign

也就是配置

spring:
	kafka:
		consumer:
			properties:
				max.poll.interval.ms: 86400000

86400000是一天的毫秒数, 我这个业务需求有一天一夜足矣

至此, 问题完美修复!

其它参考方案

  1. 调大max.poll.interval.ms(两次poll方法最大时间间隔),默认时间为300000ms
  2. 调小max.poll.records(一次最多处理的记录数量),默认500
  3. 启动多个线程并行处理数据,但要注意处理完一批消息后才能提交offset,然后进行下次的poll(会用到CountDownLatch)

修改配置参数,调大间隔,调小一次处理的最大任务数量

props.put("max.poll.records", 8);
props.put("max.poll.interval.ms", "30000");
props.put("session.timeout.ms", "30000");

使用多线程并行处理文章来源地址https://www.toymoban.com/news/detail-508985.html

@Scheduled(fixedRate = 5000)
public void processing()
{
    //如果消息队列中没有消息,等待timeout毫秒后,调用poll()方法。
    //如果队列中有消息,立即消费消息,每次消费的消息的多少
    //可以通过max.poll.records配置
    ConsumerRecords<String, String> records = consumer.poll(3000);
    if (records.count() == 0)
    {
        return;
    }
    Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
    CountDownLatch countDownLatch = new CountDownLatch(records.count());
    ConsumerRecord array[] = new ConsumerRecord[records.count()];
    int i;
    for (i = 0; i < records.count(); ++i)
    {
        array[i] = iterator.next();
    }
    for (i = 0; i < records.count(); ++i){
        final int id = i;
        if (id < records.count() - 1)
        {
            new Thread(()-> {
                disposeOneRecord(array[id],false);
                countDownLatch.countDown();
            }).start();
        }
        else
        {
            new Thread(()-> {
                disposeOneRecord(array[id],true);
                countDownLatch.countDown();
            }).start();
        }
    }
    try {
        countDownLatch.await();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    consumer.commitAsync();
    logger.info(String.format("Successfully processing %d records", records.count()));
}

private void disposeOneRecord(ConsumerRecord<String, String> record, boolean saveInRedis)
{
    String[] split;
    DCSPoint point;
    String rowKey, qualifier, value;
    List<Put> putList = new ArrayList<>();

    Map<String,Object> tagAndValue = JSONObject.parseObject(record.value()).getInnerMap();
    for (String tag : tagAndValue.keySet()) {
        split = tag.split("_");
        if (split.length != 2)
        {
            continue;
        }
        try {
            point = DCSPoint.valueOf(split[1].toUpperCase());
        }catch (IllegalArgumentException e){
            continue;
        }
        if (point.getSection() == Section.UNKNOWN || point.getDataType() != DataType.REAL)
        {
            continue;
        }
        value = tagAndValue.get(tag).toString();
        if (saveInRedis)
        {
            RedisConfig.masterRedis.set(tag, value);
        }
        rowKey = split[0] + "_" + record.key();
        qualifier = split[1];
        putList.add(HBaseDaoUtil.cellPut(rowKey, HBaseConfig.FAMILY,qualifier,value));
    }
    hBaseDao.adds(HBaseConfig.TABLE_NAME, putList);
}

到了这里,关于Commit cannot be completed since the group has already rebalanced and assign的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包