实现 Kafka 分区内消费者多线程顺序消费

这篇具有很好参考价值的文章主要介绍了实现 Kafka 分区内消费者多线程顺序消费。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

在1个topic中,有3个partition,那么如何保证数据的顺序消费?

生产者在写的时候,可以指定一个 key,被分发到同一个 partition 中去,而且这个 partition 中的数据一定是有顺序的。

消费者从 partition 中取出来数据的时候,也一定是有顺序的。到这里,顺序还是没有错乱的。

但是消费者里可能会有多个线程来并发处理消息,而多个线程并发处理的话,顺序可能就乱掉了。

解决方案

写 n 个 queue,将具有相同key的数据都存储在同一个 queue,然后对于 n 个线程,每个线程分别消费一个 queue 即可,并手动提交位点。由于 kafka consumer 实例不支持多线程同时提交位点,这里采取全局记数器的方式,在每一批次记录的消费过程中,每消费完一条记录则全局记数器加 1,全局记数器等于这一批记录的总条数时提交位点。

在Java中,可以使用多线程和队列来实现对具有相同 key 的数据进行消费,并通过手动提交位点来保证数据的消费。以下是一个带有手动位点提交的解决方案的示例代码:

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class DataConsumer {
    private Map<String, BlockingQueue<String>> queues;
    private Map<String, Integer> offsets;

    public DataConsumer(int numThreads) {
        queues = new HashMap<>();
        offsets = new HashMap<>();

        // 创建N个队列和位点
        for (int i = 0; i < numThreads; i++) {
            BlockingQueue<String> queue = new LinkedBlockingQueue<>();
            String key = Integer.toString(i);
            queues.put(key, queue);
            offsets.put(key, 0);

            // 创建并启动消费线程
            Thread consumerThread = new Thread(new Consumer(queue, key));
            consumerThread.start();
        }
    }

    public void consumeData(String key, String data) {
        BlockingQueue<String> queue = queues.get(key);
        if (queue != null) {
            try {
                // 将数据放入对应的队列
                queue.put(data);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    public void commitOffset(String key, int offset) {
        offsets.put(key, offset);
        System.out.println("Committed offset for key " + key + ": " + offset);
    }

    private static class Consumer implements Runnable {
        private final BlockingQueue<String> queue;
        private final String key;
        private int offset;

        public Consumer(BlockingQueue<String> queue, String key) {
            this.queue = queue;
            this.key = key;
            this.offset = 0;
        }

        @Override
        public void run() {
            // 消费队列中的数据
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    String data = queue.take();
                    // 进行消费逻辑
                    System.out.println("Consumed data: " + data);
                    offset++;

                    // 模拟提交位点
                    if (offset % 10 == 0) {
                        DataConsumer.getInstance().commitOffset(key, offset);
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    private static DataConsumer instance;

    public static synchronized DataConsumer getInstance() {
        if (instance == null) {
            instance = new DataConsumer(3);
        }
        return instance;
    }

    public static void main(String[] args) {
        DataConsumer dataConsumer = DataConsumer.getInstance();

        // 模拟产生数据
        for (int i = 0; i < 30; i++) {
            dataConsumer.consumeData(Integer.toString(i % 3), "Data " + (i + 1));
        }
    }
}

在以上代码中,DataConsumer 类维护了一个 Map 来存储队列和位点的关系。每个消费者线程都有一个对应的位点来记录消费的进度。

在 commitOffset 方法中,根据 key 提交位点的偏移值。

消费线程在每次成功消费一条数据后,更新位点,并判断是否满足提交位点的条件。这里模拟每消费10条数据提交一次位点。

在 main 方法中,通过 consumeData 方法模拟了产生了30条数据,并将它们放入不同的队列中进行消费。文章来源地址https://www.toymoban.com/news/detail-729626.html

到了这里,关于实现 Kafka 分区内消费者多线程顺序消费的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • kafka消费者api和分区分配和offset消费

    kafka消费者api和分区分配和offset消费

    消费者的消费方式为主动从broker拉取消息,由于消费者的消费速度不同,由broker决定消息发送速度难以适应所有消费者的能力 拉取数据的问题在于,消费者可能会获得空数据 Consumer Group(CG):消费者组 由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同

    2024年02月16日
    浏览(11)
  • kafka复习:(22)一个分区只能被消费者组中的一个消费者消费吗?

    kafka复习:(22)一个分区只能被消费者组中的一个消费者消费吗?

    默认情况下,一个分区只能被消费者组中的一个消费者消费。但可以自定义PartitionAssignor来打破这个限制。 一、自定义PartitionAssignor. 二、定义两个消费者,给其配置上述PartitionAssignor. 在kafka创建只有一个分区的topic : study2023 创建一个生产者往study2023这个 topic发送消息: 分别

    2024年02月10日
    浏览(14)
  • Kafka有几种消费者分区分配策略?

    Kafka有几种消费者分区分配策略?

    Range范围分配策略是Kafka默认的分配策略,它可以确保每个消费者消费的分区数量是均衡的。 注意:Rangle范围分配策略是针对每个Topic的。 配置 配置消费者的partition.assignment.strategy为org.apache.kafka.clients.consumer.RangeAssignor。 算法公式 n = 分区数量 / 消费者数量 m = 分区数量 % 消费

    2024年02月08日
    浏览(7)
  • kafka消费者组的分区分配策略

    一个consumer group有多个consumer,一个topic有多个partition,所以就会设计到分区分配的问题,需要确定哪些分区由哪些消费者消费。 当消费者组中的消费者发生变化,减少或者增加的时候,就会执行分区分配策略,需要重新洗牌。 分区分配策略主要有两种,第一种是Range范围分区

    2024年02月16日
    浏览(8)
  • Kafka3.0.0版本——消费者(独立消费者消费某一个主题中某个分区数据案例__订阅分区)

    Kafka3.0.0版本——消费者(独立消费者消费某一个主题中某个分区数据案例__订阅分区)

    1.1、案例需求 创建一个独立消费者,消费firstTopic主题 0 号分区的数据,所下图所示: 1.2、案例代码 生产者往firstTopic主题 0 号分区发送数据代码 消费者消费firstTopic主题 0 分区数据代码 1.3、测试 在 IDEA 中执行消费者程序,如下图: 在 IDEA 中执行生产者程序 ,在控制台观察

    2024年02月09日
    浏览(17)
  • 分布式 - 消息队列Kafka:Kafka消费者的分区分配策略

    分布式 - 消息队列Kafka:Kafka消费者的分区分配策略

    Kafka 消费者负载均衡策略? Kafka 消费者分区分配策略? 1. 环境准备 创建主题 test 有5个分区,准备 3 个消费者并进行消费,观察消费分配情况。然后再停止其中一个消费者,再次观察消费分配情况。 ① 创建主题 test,该主题有5个分区,2个副本: ② 创建3个消费者CustomConsu

    2024年02月13日
    浏览(15)
  • 分布式 - 消息队列Kafka:Kafka消费者分区再均衡(Rebalance)

    分布式 - 消息队列Kafka:Kafka消费者分区再均衡(Rebalance)

    01. Kafka 消费者分区再均衡是什么? 消费者群组里的消费者共享主题分区的所有权。当一个新消费者加入群组时,它将开始读取一部分原本由其他消费者读取的消息。当一个消费者被关闭或发生崩溃时,它将离开群组,原本由它读取的分区将由群组里的其他消费者读取。 分区

    2024年02月12日
    浏览(12)
  • Kafka:主题创建、分区修改查看、生产者、消费者

    Kafka:主题创建、分区修改查看、生产者、消费者

    1.创建主题 2.查看所有主题 3.查看详细主题 序号从0开始计算 Partition:分区数,该主题有3个分区 Replica:副本数,该主题有3个副本 Leader:副本数中的主的序号,生产消费的对象 1.修改分区数 修改的分区数量不可以小于或者等于当前主题分区的数量,否则会报错 在根目录kaf

    2024年02月11日
    浏览(16)
  • Kafka及Kafka消费者的消费问题及线程问题

    Topic:是 Kafka 消息发布和订阅的基本单元,同时也是消息的容器。Topic 中的消息被分割成多个分区进行存储和处理。 Partition:是 Topic 分区,将 Topic 细分成多个分区,每个分区可以独立地存储在不同的 Broker 中,从而增加了消息的并发性、可扩展性和吞吐量。 Broker:是 Kafka

    2024年02月14日
    浏览(15)
  • Kafka系列之:记录一次Kafka Topic分区扩容,但是下游flink消费者没有自动消费新的分区的解决方法

    Kafka系列之:记录一次Kafka Topic分区扩容,但是下游flink消费者没有自动消费新的分区的解决方法

    生产环境Kafka集群压力大,Topic读写压力大,消费的lag比较大,因此通过扩容Topic的分区,增大Topic的读写性能 理论上下游消费者应该能够自动消费到新的分区,例如flume消费到了新的分区,但是实际情况是存在flink消费者没有消费到新的分区 出现无法消费topic新的分区这种情况

    2024年02月14日
    浏览(18)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包