kafka Consumer 消费者使用多线程并发执行,并保证顺序消费, 第一种使用纯线程方式、第二种使用Executors线程池

这篇具有很好参考价值的文章主要介绍了kafka Consumer 消费者使用多线程并发执行,并保证顺序消费, 第一种使用纯线程方式、第二种使用Executors线程池。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

网上搜索kafka消费者通过多线程进行顺序消费的内容都不太理想,或者太过复杂,所以自己写了几个demo,供大家参考指正。

需求内容

        单个消费者,每秒需要处理1000条数据,每条数据的处理时间为500ms,相同accNum(客户账号)的数据需要保证消费的顺序。

注意点

1、如果1秒钟生产1000条数据,消费者处理时,每条数据需要500毫秒,则消费者每次拉取数据的条数最好能控制在500条以上,这样1秒内的数据可以拉取两次,每次使用500个线程进行处理,每次耗时500ms,

        2*500ms=1秒,基本可以保证1000条数据能够在1秒内处理完成。

如果消费者每100ms拉取一次,每次拉取100条数据,消费者使用100个线程处理这100条数据,耗时500ms,第二次再拉取100条,耗时500ms...这样处理完1秒内的1000条数据将一共需要

        10次*500ms=5秒钟,出现较大延迟。

        同时,还要注意,一批数据中存在相同的accNum(客户账号)的情况,如果存在2条相同的accNum,因为需要顺序执行,一条执行需要500ms,两条顺序执行完成将花费1秒,这批数据的整体完成时间将变为1秒。

        注意这三个参数的调整:

        // fetch.max.bytes:一次拉取的最小可返回数据量:1Bety
        props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 100 * 1024);
        // fetch.max.wait.ms:一次拉取的最大等待时间:500ms
        props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
        // max.poll.records: 一次拉取的最大条数
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2000);

        注意消费者的拉取延迟时间:

        tKafkaConsumer.poll(500);

2、每批次数据处理时,创建的线程数,会根据每次拉取的数据条数自动调整,最大线程数为消费者每次允许拉取的最大数据条数。这样系统可以根据数据量大小自动调整创建的线程数,线程池中的空闲线程可以在一定时间后自动释放。可以保证不同accNum(客户账号)的数据每次都分配一个线程单独处理,从而保证处理的时间(500ms)。

第一种使用纯线程方式(Thread+Callable+FutureTask)

因为每次处理都创建新的线程,造成大量线程同时创建和销毁,线程数波动剧烈,GC频繁,系统各项指标均不平稳。
package com.autoee.demo.kafka.main;

import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.LoggerContext;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.date.TimeInterval;
import cn.hutool.core.map.MapUtil;
import cn.hutool.json.JSONUtil;
import com.autoee.demo.riskmonitor.BusiDataEntity;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Title: <br>
 * Desc: <br>
 * Date: 2022-8-19 <br>
 * @author Double
 * @version 1.0.0
 */
public class KafkaConsumerMutiThreadsTest3_Callable_HashMap {

    private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerMutiThreadsTest3_Callable_HashMap.class);

    // 设置main方法执行时的日志输出级别
    static {
        LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
        List<ch.qos.logback.classic.Logger> loggerList = loggerContext.getLoggerList();
        loggerList.forEach(logger -> {
            logger.setLevel(Level.INFO);
        });
    }

    // 需求内容:单个消费者,每秒需要处理1000条数据,每条数据的处理时间为500ms,相同accNum的数据需要保证消费的顺序

    // 测试极限情况:数据已存在大量积压,启动消费者进行消费
    // 每次拉取都达到设置的单次可以拉取的最大条数:2000条


    public static void main(String[] args) throws InterruptedException { 

        Properties props = new Properties();
        // bootstrap.servers:kafka集群地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        // 消费者组id
        props.put("group.id", "test_consumer_group"); //消费者组
        // key.deserializer:key的反序列化器
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // value.deserializer:value的反序列化器
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // fetch.max.bytes:一次拉取的最小可返回数据量:1Bety
        props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 100 * 1024);
        // fetch.max.bytes:一次拉取的最大数据量:50M
        props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 50 * 1024 * 1024);
        // fetch.max.wait.ms:一次拉取的最大等待时间:500ms
        props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
        // max.poll.records: 一次拉取的最大条数
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2000);
        // max.partition.fetch.bytes:一次拉取时,每个分区最大拉取数据量,默认1M
        props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1 * 1024 * 1024);
        // auto.offset.reset:当 Kafka 中没有初始偏移量或当前偏移量在服务器中不存在 (如,数据被删除了)时,自动设置开始消费的偏移量位置,默认latest。
        // earliest:自动重置偏移量到最早的偏移量(从头开始消费)。
        // latest:默认,自动重置偏移量为最新的偏移量(从最新的接收到的数据开始消费)。
        // none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。 anything:向消费者抛异常。
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        // enable.auto.commit:是否允许自动提交offset,默认是。
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        // auto.commit.interval.ms:自动提交offset的时间间隔,默认5秒。
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
        // heartbeat.interval.ms:消费者心跳检测时间间隔,默认3秒。
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000);
        // session.timeout.ms:session过期时间,默认10秒。
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
        // max.poll.interval.ms:一批次数据最大可以执行时间,默认5分钟。
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
        // partition.assignment.strategy:分区分配策略,默认5分钟。
        props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());

        KafkaConsumer<String, String> tKafkaConsumer = new KafkaConsumer<String, String>(props);
        tKafkaConsumer.subscribe(Arrays.asList("riskMoniterTopic"));

        HashMap<String, List<BusiDataEntity>> hashMap;
        while (true) {
            TimeInterval timer = DateUtil.timer();
            logger.info("[开始]-consumer拉取数据");
            ConsumerRecords<String, String> records = tKafkaConsumer.poll(500);
            int dataCount = records.count();
            AtomicInteger tAtomicInteger = new AtomicInteger();
            logger.info("[完成]-consumer拉取数据-条数=[{}]-耗时=[{}]", dataCount, timer.intervalMs());
            // 拉取的数据条数大于0时,才进行处理操作
            timer = DateUtil.timer();
            if (dataCount > 0) {
                // 初始化hashMap:容量可以设置为拉取数据条数的1倍,或者2倍,2倍更加分散
                // 消费者参数中设置一次拉取的最大条数为2000,基本不会超过该值。
                // hashMap的hash碰撞概率较低,2000条数据,分布到4000容量的hashMap中时,基本不会出现碰撞,只有相同的key会在一起,导致整体执行时间为相同多个key顺序执行的时间
                // [线程执行完成]消费者线程:consumer-thread-VV0039-已处理数据数量=3-已处理的所有客户账号=VV0039,VV0039,VV0039,
                // [线程执行完成]消费者线程:consumer-thread-AG0097-已处理数据数量=2-已处理的所有客户账号=AG0097,AG0097,
                // [线程执行完成]消费者线程:consumer-thread-ID0045-已处理数据数量=1-已处理的所有客户账号=ID0045,
                int arrListCapacity = dataCount * 2;
                hashMap = new HashMap<>(arrListCapacity);
                // 将拉取的数据按客户号码分散到HashMap中
                for (ConsumerRecord<String, String> record : records) {
                    Object value = record.value();
                    String jsonStr = JSONUtil.toJsonStr(value);
                    // logger.info("[获取]-传入报文=[{}]", jsonStr);
                    BusiDataEntity busiDataEntity = JSONUtil.toBean(jsonStr, BusiDataEntity.class);
                    String accNum = busiDataEntity.getAccNum();

                    if (hashMap.containsKey(accNum)) {
                        hashMap.get(accNum).add(busiDataEntity);
                    } else {
                        List<BusiDataEntity> newList = new ArrayList<>();
                        newList.add(busiDataEntity);
                        hashMap.put(accNum, newList);
                    }
                }

                ArrayList<FutureTask<String>> tFutureTaskArrayList = new ArrayList<>(dataCount);
                // 循环hashMap,每个value开启一个线程循环处理该List中的全部数据,保证数据处理的顺序
                int num = 0;
                hashMap.forEach((k, v) -> {
                    List<BusiDataEntity> busiDataEntities = v;
                    String threadName = "";
                    if (busiDataEntities.size() > 0) {
                        threadName = "consumer-thread-" + k;
                        // 使用Callable执行一组数据
                        FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {
                            @Override
                            public String call() {
                                String threadName = Thread.currentThread().getName();
                                // logger.info("[获取]-消费者线程:{}-获取到待处理数据数量:{}", threadName, busiDataEntities.size());
                                String allAccNum = "";
                                for (BusiDataEntity busiDataEntity : busiDataEntities) {
                                    allAccNum = allAccNum + busiDataEntity.getAccNum() + ",";
                                    try {
                                        // 模拟业务处理时间,默认500ms
                                        Thread.sleep(500);
                                    } catch (InterruptedException e) {
                                        e.printStackTrace();
                                    }
                                }
                                return "消费者线程:" + threadName + "-已处理数据数量=" + busiDataEntities.size() + "-已处理的所有客户账号=" + allAccNum;
                            }
                        });
                        
                        // 启动一个线程执行一组数据
                        new Thread(futureTask, threadName).start();
                        // 将每个线程的futureTask都放入同一个ArrayList中
                        tFutureTaskArrayList.add(futureTask);
                    }
                });
                // 循环tFutureTaskArrayList,检查所有futureTask是否都已经返回,没返回的阻塞等待,等都返回后证明所有线程都执行完成,提交offset
                // 因为每次处理都创建新的线程,大量线程同时创建和销毁,线程数波动剧烈,考虑通过线程池进行优化
                for (int i = 0; i < tFutureTaskArrayList.size(); i++) {
                    try {
                        String returnStr = tFutureTaskArrayList.get(i).get();
                        logger.info("[线程执行完成]" + returnStr);
                    } catch (ExecutionException e) {
                        e.printStackTrace();
                    }
                }
            }

            //同步提交offset
            // tKafkaConsumer.commitSync();
            //异步提交
            tKafkaConsumer.commitAsync(new OffsetCommitCallback() {
                @Override
                public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                    if (exception != null) {
                        logger.error("[失败]-提交offset失败!" + offsets);
                    } else {
                        logger.info("[成功]-提交offset成功!");
                    }
                }
            });

            logger.info("【完成处理数据】-条数=[{}]-耗时=[{}]", dataCount, timer.intervalMs());
        }

    }
}

kafka多线程消费如何保证顺序,kafka,kafka,Consumer,多线程,顺序消费,Executors

测试结果:


    // [开始]-consumer拉取数据
    // [完成]-consumer拉取数据-条数=[2000]-耗时=[5]
    // [成功]-提交offset成功!
    // 【完成处理数据】-条数=[2000]-耗时=[1731]
    // [开始]-consumer拉取数据
    // [完成]-consumer拉取数据-条数=[2000]-耗时=[4]
    // [成功]-提交offset成功!
    // 【完成处理数据】-条数=[2000]-耗时=[1678]
    // [开始]-consumer拉取数据
    // [完成]-consumer拉取数据-条数=[2000]-耗时=[23]
    // [成功]-提交offset成功!
    // 【完成处理数据】-条数=[2000]-耗时=[1637]

    // 测试结果:2000条可以在2秒处理完成,则可以保证1000条时可以在1秒能处理完成,满足需求内容。
    // 因为每次处理都创建新的线程,造成大量线程同时创建和销毁,线程数波动剧烈,GC频繁,系统各项指标均不平稳。

第二种使用Executors线程池(Executors+Callable+FutureTask)

通过线程池进行处理,线程数一直保持在2000个左右。
package com.autoee.demo.kafka.main;

import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.LoggerContext;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.date.TimeInterval;
import cn.hutool.json.JSONUtil;
import com.autoee.demo.riskmonitor.BusiDataEntity;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Title: <br>
 * Desc: <br>
 * Date: 2022-8-19 <br>
 * @author Double
 * @version 1.0.0
 */
public class KafkaConsumerMutiThreadsTest4_Executors_HashMap {

    private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerMutiThreadsTest4_Executors_HashMap.class);

    // 设置main方法执行时的日志输出级别
    static {
        LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
        List<ch.qos.logback.classic.Logger> loggerList = loggerContext.getLoggerList();
        loggerList.forEach(logger -> {
            logger.setLevel(Level.INFO);
        });
    }

    // 需求内容:单个消费者,每秒需要处理1000条数据,每条数据的处理时间为500ms,相同accNum的数据需要保证消费的顺序

    // 测试极限情况:数据已存在大量积压,启动消费者进行消费
    // 每次拉取都达到设置的单次可以拉取的最大条数:2000条

    public static void main(String[] args) throws InterruptedException {

        Properties props = new Properties();
        // bootstrap.servers:kafka集群地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        // 消费者组id
        props.put("group.id", "test_consumer_group"); //消费者组
        // key.deserializer:key的反序列化器
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // value.deserializer:value的反序列化器
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // fetch.max.bytes:一次拉取的最小可返回数据量:1Bety
        props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 100 * 1024);
        // fetch.max.bytes:一次拉取的最大数据量:50M
        props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 50 * 1024 * 1024);
        // fetch.max.wait.ms:一次拉取的最大等待时间:500ms
        props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
        // max.poll.records: 一次拉取的最大条数
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2000);
        // max.partition.fetch.bytes:一次拉取时,每个分区最大拉取数据量,默认1M
        props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1 * 1024 * 1024);
        // auto.offset.reset:当 Kafka 中没有初始偏移量或当前偏移量在服务器中不存在 (如,数据被删除了)时,自动设置开始消费的偏移量位置,默认latest。
        // earliest:自动重置偏移量到最早的偏移量(从头开始消费)。
        // latest:默认,自动重置偏移量为最新的偏移量(从最新的接收到的数据开始消费)。
        // none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。 anything:向消费者抛异常。
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        // enable.auto.commit:是否允许自动提交offset,默认是。
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        // auto.commit.interval.ms:自动提交offset的时间间隔,默认5秒。
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
        // heartbeat.interval.ms:消费者心跳检测时间间隔,默认3秒。
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000);
        // session.timeout.ms:session过期时间,默认10秒。
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
        // max.poll.interval.ms:一批次数据最大可以执行时间,默认5分钟。
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
        // partition.assignment.strategy:分区分配策略,默认5分钟。
        props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());

        KafkaConsumer<String, String> tKafkaConsumer = new KafkaConsumer<String, String>(props);
        tKafkaConsumer.subscribe(Arrays.asList("riskMoniterTopic"));

        // 使用Executors中的CachedThreadPool,初始核心线程数为0,最大线程数为无限大,线程最大空闲时间为60秒
        // corePoolSize=0
        // maximumPoolSize=Integer.MAX_VALUE,即2147483647,基本属于无界。
        // keepAliveTime=60秒
        // 工作队列使用没有容量的 SynchronousQueue,来一个任务处理一个任务,不进行缓存。如果提交任务速度高于线程池中线程处理任务的速度,则会不断创建新线程。极端情况下会创建过多的线程,耗尽 CPU 和内存资源。
        // 可以自定义线程池进行优化
        ExecutorService executorService = Executors.newCachedThreadPool();

        HashMap<String, List<BusiDataEntity>> busiDataHashMap;
        while (true) {
            TimeInterval timer = DateUtil.timer();
            logger.info("[开始]-consumer拉取数据");
            ConsumerRecords<String, String> records = tKafkaConsumer.poll(500);
            int dataCount = records.count();
            AtomicInteger tAtomicInteger = new AtomicInteger();
            logger.info("[完成]-consumer拉取数据-条数=[{}]-耗时=[{}]", dataCount, timer.intervalMs());
            // 拉取的数据条数大于0时,才进行处理操作
            timer = DateUtil.timer();
            if (dataCount > 0) {
                // 初始化hashMap:容量可以设置为拉取数据条数的1倍,或者2倍,2倍更加分散
                // 消费者参数中设置一次拉取的最大条数为2000,基本不会超过该值。
                // hashMap的hash碰撞概率较低,2000条数据,分布到4000容量的hashMap中时,基本不会出现碰撞,只有相同的key会在一起,导致整体执行时间为相同多个key顺序执行的时间
                // [线程执行完成]消费者线程:pool-1-thread-1898-已处理数据数量=3-已处理的所有客户账号=GW0032,GW0032,GW0032,
                // [线程执行完成]消费者线程:pool-1-thread-1193-已处理数据数量=2-已处理的所有客户账号=KE0055,KE0055,
                // [线程执行完成]消费者线程:pool-1-thread-1187-已处理数据数量=2-已处理的所有客户账号=0E0005,0E0005,
                int capacity = dataCount * 2;
                busiDataHashMap = new HashMap<>(capacity);
                // 将拉取的数据按客户号码分散到HashMap中
                for (ConsumerRecord<String, String> record : records) {
                    Object value = record.value();
                    String jsonStr = JSONUtil.toJsonStr(value);
                    // logger.info("[获取]-传入报文=[{}]", jsonStr);
                    BusiDataEntity busiDataEntity = JSONUtil.toBean(jsonStr, BusiDataEntity.class);
                    String accNum = busiDataEntity.getAccNum();

                    if (busiDataHashMap.containsKey(accNum)) {
                        busiDataHashMap.get(accNum).add(busiDataEntity);
                    } else {
                        List<BusiDataEntity> newList = new ArrayList<>();
                        newList.add(busiDataEntity);
                        busiDataHashMap.put(accNum, newList);
                    }
                }

                ArrayList<FutureTask<String>> tFutureTaskArrayList = new ArrayList<>(dataCount);
                // 循环hashMap,每个value开启一个线程循环处理该List中的全部数据,保证数据处理的顺序
                int num = 0;
                busiDataHashMap.forEach((k, v) -> {
                    List<BusiDataEntity> busiDataEntities = v;
                    String threadName = "";
                    if (busiDataEntities.size() > 0) {
                        threadName = k;
                        // 使用Callable执行同一个Key下的一组数据
                        FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {
                            @Override
                            public String call() {
                                String threadName = Thread.currentThread().getName();
                                // logger.info("[获取]-消费者线程:{}-获取到待处理数据数量:{}", threadName, busiDataEntities.size());
                                String allAccNum = "";
                                for (BusiDataEntity busiDataEntity : busiDataEntities) {
                                    allAccNum = allAccNum + busiDataEntity.getAccNum() + ",";
                                    try {
                                        // 模拟业务处理时间,默认500ms
                                        Thread.sleep(500);
                                    } catch (InterruptedException e) {
                                        e.printStackTrace();
                                    }
                                }
                                return "消费者线程:" + threadName + "-已处理数据数量=" + busiDataEntities.size() + "-已处理的所有客户账号=" + allAccNum;
                            }
                        });

                        // 通过线程池进行任务处理
                        executorService.submit(futureTask);
                        // 将每个线程的futureTask都放入同一个ArrayList中
                        tFutureTaskArrayList.add(futureTask);
                    }
                });
                // 循环tFutureTaskArrayList,检查所有futureTask是否都已经返回,没返回的阻塞等待,等都返回后证明所有线程都执行完成,提交offset
                // 使用线程池后,线程数一直保持在2000个左右。
                for (int i = 0; i < tFutureTaskArrayList.size(); i++) {
                    try {
                        String returnStr = tFutureTaskArrayList.get(i).get();
                        logger.info("[线程执行完成]" + returnStr);
                    } catch (ExecutionException e) {
                        e.printStackTrace();
                    }
                }
            }

            //同步提交offset
            // tKafkaConsumer.commitSync();
            //异步提交
            tKafkaConsumer.commitAsync(new OffsetCommitCallback() {
                @Override
                public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                    if (exception != null) {
                        logger.error("[失败]-提交offset失败!" + offsets);
                    } else {
                        logger.info("[成功]-提交offset成功!");
                    }
                }
            });

            logger.info("【完成处理数据】-条数=[{}]-耗时=[{}]", dataCount, timer.intervalMs());

        }

    }
}

kafka多线程消费如何保证顺序,kafka,kafka,Consumer,多线程,顺序消费,Executors

测试结果:

    // [开始]-consumer拉取数据
    // [完成]-consumer拉取数据-条数=[2000]-耗时=[5]
    // [成功]-提交offset成功!
    // 【完成处理数据】-条数=[2000]-耗时=[1731]
    // [开始]-consumer拉取数据
    // [完成]-consumer拉取数据-条数=[2000]-耗时=[4]
    // [成功]-提交offset成功!
    // 【完成处理数据】-条数=[2000]-耗时=[1678]
    // [开始]-consumer拉取数据
    // [完成]-consumer拉取数据-条数=[2000]-耗时=[23]
    // [成功]-提交offset成功!
    // 【完成处理数据】-条数=[2000]-耗时=[1637]

    // 测试结果:2000条可以在2秒处理完成,则可以保证1000条时可以在1秒能处理完成,满足需求内容。
    // 使用线程池后,线程数一直保持在2000个左右

第三种使用Executors线程池(Executors+Runnable+CountDownLatch)

package com.autoee.demo.kafka.main;

import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.LoggerContext;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.date.TimeInterval;
import cn.hutool.json.JSONUtil;
import com.autoee.demo.riskmonitor.BusiDataEntity;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Title: <br>
 * Desc: <br>
 * Date: 2022-8-19 <br>
 * @author Double
 * @version 1.0.0
 */
public class KafkaConsumerMutiThreadsTest5_Executors_HashMap_CountDownLatch {

    private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerMutiThreadsTest5_Executors_HashMap_CountDownLatch.class);

    // 设置main方法执行时的日志输出级别
    static {
        LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
        List<ch.qos.logback.classic.Logger> loggerList = loggerContext.getLoggerList();
        loggerList.forEach(logger -> {
            logger.setLevel(Level.INFO);
        });
    }

    // 需求内容:单个消费者,每秒需要处理1000条数据,每条数据的处理时间为500ms,相同accNum的数据需要保证消费的顺序

    // 测试极限情况:数据已存在大量积压,启动消费者进行消费
    // 每次拉取都达到设置的单次可以拉取的最大条数:2000条

    // [开始]-consumer拉取数据
    // [完成]-consumer拉取数据-条数=[2000]-耗时=[5]
    // [成功]-提交offset成功!
    // 【完成处理数据】-条数=[2000]-耗时=[1731]
    // [开始]-consumer拉取数据
    // [完成]-consumer拉取数据-条数=[2000]-耗时=[4]
    // [成功]-提交offset成功!
    // 【完成处理数据】-条数=[2000]-耗时=[1678]
    // [开始]-consumer拉取数据
    // [完成]-consumer拉取数据-条数=[2000]-耗时=[23]
    // [成功]-提交offset成功!
    // 【完成处理数据】-条数=[2000]-耗时=[1637]

    // 测试结果:2000条可以在2秒处理完成,则可以保证1000条时可以在1秒能处理完成,满足需求内容。
    // 通过线程池进行处理,线程数非常平稳,而且只需要十个左右线程就能处理每次2000条的数据。

    public static void main(String[] args) throws InterruptedException {

        Properties props = new Properties();
        // bootstrap.servers:kafka集群地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        // 消费者组id
        props.put("group.id", "test_consumer_group"); //消费者组
        // key.deserializer:key的反序列化器
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // value.deserializer:value的反序列化器
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // fetch.max.bytes:一次拉取的最小可返回数据量:1Bety
        props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 100 * 1024);
        // fetch.max.bytes:一次拉取的最大数据量:50M
        props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 50 * 1024 * 1024);
        // fetch.max.wait.ms:一次拉取的最大等待时间:500ms
        props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
        // max.poll.records: 一次拉取的最大条数
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2000);
        // max.partition.fetch.bytes:一次拉取时,每个分区最大拉取数据量,默认1M
        props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1 * 1024 * 1024);
        // auto.offset.reset:当 Kafka 中没有初始偏移量或当前偏移量在服务器中不存在 (如,数据被删除了)时,自动设置开始消费的偏移量位置,默认latest。
        // earliest:自动重置偏移量到最早的偏移量(从头开始消费)。
        // latest:默认,自动重置偏移量为最新的偏移量(从最新的接收到的数据开始消费)。
        // none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。 anything:向消费者抛异常。
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        // enable.auto.commit:是否允许自动提交offset,默认是。
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        // auto.commit.interval.ms:自动提交offset的时间间隔,默认5秒。
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
        // heartbeat.interval.ms:消费者心跳检测时间间隔,默认3秒。
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000);
        // session.timeout.ms:session过期时间,默认10秒。
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
        // max.poll.interval.ms:一批次数据最大可以执行时间,默认5分钟。
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
        // partition.assignment.strategy:分区分配策略,默认5分钟。
        props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());

        KafkaConsumer<String, String> tKafkaConsumer = new KafkaConsumer<String, String>(props);
        tKafkaConsumer.subscribe(Arrays.asList("riskMoniterTopic"));

        // 使用Executors中的CachedThreadPool,初始核心线程数为0,最大线程数为无限大,线程最大空闲时间为60秒
        // corePoolSize=0
        // maximumPoolSize=Integer.MAX_VALUE,即2147483647,基本属于无界。
        // keepAliveTime=60秒
        // 工作队列使用没有容量的 SynchronousQueue,来一个任务处理一个任务,不进行缓存。如果提交任务速度高于线程池中线程处理任务的速度,则会不断创建新线程。极端情况下会创建过多的线程,耗尽 CPU 和内存资源。
        // 可以自定义线程池进行优化
        ExecutorService executorService = Executors.newCachedThreadPool();

        HashMap<String, List<BusiDataEntity>> busiDataHashMap;
        while (true) {
            TimeInterval timer = DateUtil.timer();
            logger.info("[开始]-consumer拉取数据");
            ConsumerRecords<String, String> records = tKafkaConsumer.poll(500);
            int dataCount = records.count();
            AtomicInteger tAtomicInteger = new AtomicInteger();
            logger.info("[完成]-consumer拉取数据-条数=[{}]-耗时=[{}]", dataCount, timer.intervalMs());
            // 拉取的数据条数大于0时,才进行处理操作
            timer = DateUtil.timer();
            if (dataCount > 0) {
                // 初始化hashMap:容量可以设置为拉取数据条数的1倍,或者2倍,2倍更加分散
                // 消费者参数中设置一次拉取的最大条数为2000,基本不会超过该值。
                // hashMap的hash碰撞概率较低,2000条数据,分布到4000容量的hashMap中时,基本不会出现碰撞,只有相同的key会在一起,导致整体执行时间为相同多个key顺序执行的时间
                // [线程执行完成]消费者线程:pool-1-thread-1898-已处理数据数量=3-已处理的所有客户账号=GW0032,GW0032,GW0032,
                // [线程执行完成]消费者线程:pool-1-thread-1193-已处理数据数量=2-已处理的所有客户账号=KE0055,KE0055,
                // [线程执行完成]消费者线程:pool-1-thread-1187-已处理数据数量=2-已处理的所有客户账号=0E0005,0E0005,
                int capacity = dataCount * 2;
                busiDataHashMap = new HashMap<>(capacity);
                // 将拉取的数据按客户号码分散到ArrayList中
                for (ConsumerRecord<String, String> record : records) {
                    Object value = record.value();
                    String jsonStr = JSONUtil.toJsonStr(value);
                    // logger.info("[获取]-传入报文=[{}]", jsonStr);
                    BusiDataEntity busiDataEntity = JSONUtil.toBean(jsonStr, BusiDataEntity.class);
                    String accNum = busiDataEntity.getAccNum();

                    if (busiDataHashMap.containsKey(accNum)) {
                        busiDataHashMap.get(accNum).add(busiDataEntity);
                    } else {
                        List<BusiDataEntity> newList = new ArrayList<>();
                        newList.add(busiDataEntity);
                        busiDataHashMap.put(accNum, newList);
                    }
                }

                // 循环ArrayList,每个下标中的List数据条数大于0时,开启一个线程循环处理该List中的全部数据,保证数据处理的顺序
                int num = 0;
                int busiDataHashMapSize = busiDataHashMap.keySet().size();
                // 使用CountDownLatch判断是否所有子线程都已执行完成,子线程个数等于busiDataHashMap中key的个数
                CountDownLatch tCountDownLatch = new CountDownLatch(busiDataHashMapSize);
                busiDataHashMap.forEach((k, v) -> {
                    List<BusiDataEntity> busiDataEntities = v;
                    String threadName = "";
                    if (busiDataEntities.size() > 0) {
                        threadName = k;
                        // 使用Runnable执行同一个Key下的一组数据
                        Runnable runnableTask = new Runnable() {
                            @Override
                            public void run() {
                                String threadName = Thread.currentThread().getName();
                                // logger.info("[获取]-消费者线程:{}-获取到待处理数据数量:{}", threadName, busiDataEntities.size());
                                String allAccNum = "";
                                String allBatchNo = "";
                                for (BusiDataEntity busiDataEntity : busiDataEntities) {
                                    allAccNum = allAccNum + busiDataEntity.getAccNum() + ",";
                                    allBatchNo = allBatchNo + busiDataEntity.getBatchNo() + ",";
                                    try {
                                        // 模拟业务处理时间,默认500ms
                                        Thread.sleep(500);
                                    } catch (InterruptedException e) {
                                        e.printStackTrace();
                                    }
                                }
                                logger.info("[线程执行完成]-消费者线程:" + threadName + "-已处理数据数量=" + busiDataEntities.size() + "-已处理的所有客户账号=" + allAccNum + "-已处理的所有批次号=" + allBatchNo);
                                // 每个线程处理完成后,将tCountDownLatch减1
                                tCountDownLatch.countDown();
                            }
                        };

                        // 通过线程池进行任务处理
                        executorService.submit(runnableTask);
                    }
                });

                // 通过CountDownLatch阻塞等待,等待所有线程都执行完成,提交offset
                tCountDownLatch.await();

                //同步提交offset
                // tKafkaConsumer.commitSync();
                //异步提交
                tKafkaConsumer.commitAsync(new OffsetCommitCallback() {
                    @Override
                    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                        if (exception != null) {
                            logger.error("[失败]-提交offset失败!" + offsets);
                        } else {
                            logger.info("[成功]-提交offset成功!");
                        }
                    }
                });

                logger.info("【完成处理数据】-条数=[{}]-耗时=[{}]", dataCount, timer.intervalMs());
                logger.info("----------------------------------------------------------------------------------------------------------------------------------------");

            }
        }
    }
}

kafka多线程消费如何保证顺序,kafka,kafka,Consumer,多线程,顺序消费,Executors

测试结果:

        和第二种的执行时间差不多,但是各项性能指标好像更加平稳了,但是很出现线程阻塞的情况。

如果对您有帮助,请我喝杯咖啡吧!

kafka多线程消费如何保证顺序,kafka,kafka,Consumer,多线程,顺序消费,Executors文章来源地址https://www.toymoban.com/news/detail-611477.html

到了这里,关于kafka Consumer 消费者使用多线程并发执行,并保证顺序消费, 第一种使用纯线程方式、第二种使用Executors线程池的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • kafka-consumer-groups.sh消费者组管理

      先调用 MetadataRequest 拿到所有在线Broker列表 再给每个Broker发送 ListGroupsRequest 请求获取 消费者组数据。 查看指定消费组详情 --group 查看所有消费组详情 --all-groups 查询消费者成员信息 --members 查询消费者状态信息 --state 删除指定消费组 --group 删除所有消费组 --all-groups 想要

    2024年02月03日
    浏览(40)
  • Kafka 架构深度解析:生产者(Producer)和消费者(Consumer)

    Apache Kafka 作为分布式流处理平台,其架构中的生产者和消费者是核心组件,负责实现高效的消息生产和消费。本文将深入剖析 Kafka 架构中生产者和消费者的工作原理、核心概念以及高级功能。 1 发送消息到 Kafka Kafka 生产者负责将消息发布到指定的主题。以下是一个简单的生

    2024年02月03日
    浏览(43)
  • Kafka及Kafka消费者的消费问题及线程问题

    Topic:是 Kafka 消息发布和订阅的基本单元,同时也是消息的容器。Topic 中的消息被分割成多个分区进行存储和处理。 Partition:是 Topic 分区,将 Topic 细分成多个分区,每个分区可以独立地存储在不同的 Broker 中,从而增加了消息的并发性、可扩展性和吞吐量。 Broker:是 Kafka

    2024年02月14日
    浏览(39)
  • 实现 Kafka 分区内消费者多线程顺序消费

    生产者在写的时候,可以指定一个 key,被分发到同一个 partition 中去,而且这个 partition 中的数据一定是有顺序的。 消费者从 partition 中取出来数据的时候,也一定是有顺序的。到这里,顺序还是没有错乱的。 但是消费者里可能会有多个线程来并发处理消息,而多个线程并发

    2024年02月07日
    浏览(39)
  • 【Kafka-Consumer分区分配策略】Kafka 消费者组三种分区分配策略 Range Assignor、RoundRobin Assignor、Sticky Assignor 详细解析

    1、一个 consumer group 中有多个 consumer 组成,一个 topic 有多个 partition 组成,现在的问题是,到底由哪个 consumer 来消费哪个 partition 的数据。 2、Kafka有四种主流的分区分配策略: Range、RoundRobin、Sticky、CooperativeSticky。 可以通过配置参数 partition.assignment.strategy ,修改分区的分配

    2024年02月22日
    浏览(44)
  • 探究:kafka生产者/消费者与多线程安全

    目录 1. 多线程安全 1.1. 生产者是多线程安全的么? 1.1. 消费者是多线程安全的么? 2. 消费者规避多线程安全方案 2.1. 每个线程维护一个kafkaConsumer 2.2. [单/多]kafkaConsumer实例 + 多worker线程 2.3.方案优缺点对比         Kafka生产者是 线程安全 的,可以在多个线程中共享一个

    2023年04月26日
    浏览(88)
  • kafka消费者报错Offset commit ......it is likely that the consumer was kicked out of the group的解决

    2022年10月份接到一个小功能,对接kafka将数据写到数据库,开始的需求就是无脑批量insert,随着时间的推移,业务需求有变更,kafka的生产消息频次越来越高,到今年7月份为止就每秒会有几十条甚至上百条,然后消费消息的代码就报错: Caused by: org.apache.kafka.clients.consumer.Com

    2024年02月07日
    浏览(47)
  • JUC并发编程-线程和进程、Synchronized 和 Lock、生产者和消费者问题

    源码 + 官方文档 面试高频问! java.util 工具包、包、分类 业务:普通的线程代码 Thread Runnable Runnable 没有返回值、效率相比入 Callable 相对较低! 线程、进程,如果不能使用一句话说出来的技术,不扎实! 进程:一个程序,QQ.exe Music.exe 程序的集合; 一个进程往往可以包含多

    2024年01月20日
    浏览(50)
  • [RocketMQ] Consumer消费者启动主要流程源码 (六)

    客户端常用的消费者类是DefaultMQPushConsumer, DefaultMQPushConsumer的构造器以及start方法的源码。 1.创建DefaultMQPushConsumer实例 最终都是调用下面四个参数的构造函数: 指定了命名空间、生产者组、RPC钩子和消费者之间消息分配的策略算法的构造器, 创建了一个DefaultMQPushConsumerImpl实例

    2024年02月16日
    浏览(45)
  • rabbitmq之Consumer Prefetch(消费者预取)

    官方文档: https://www.rabbitmq.com/consumer-prefetch.html https://www.rabbitmq.com/confirms.html#channel-qos-prefetch 测试”消息积压“场景:在消费者没有启动的情况下,生产者先生产很多消息。然后先开启一个a消费者,再开启b消费者,发现只有a消费者不断的消费旧的消息,而b消费者”无动于

    2024年02月11日
    浏览(34)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包