java 集成kafka(支持单条消费和批量消费)

这篇具有很好参考价值的文章主要介绍了java 集成kafka(支持单条消费和批量消费)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1、下载安装zk,kafka...(大把教程,不在这里过多阐述)

2、引入pom

		<!--kafka-->
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>

3、Kafka配置

# kafka bootstrap-servers:连接kafka的地址,多个地址用逗号分隔
spring.kafka.bootstrap-servers=localhost:9092
# 设置是否批量消费,默认 single(单条),batch(批量)
spring.kafka.listener.type=single
## 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
spring.kafka.listener.ack-mode=batch

4、生产者配置

# producer 消费生产者配置-----

# 应答级别
# acks=0 把消息发送到kafka就认为发送成功
# acks=1 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功
# acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功
spring.kafka.producer.acks=all

# 重试次数 若设置大于0的值,客户端会将发送失败的记录重新发送
spring.kafka.producer.retries=3

# 当将多个记录被发送到同一个分区时, Producer 将尝试将记录组合到更少de请求中。这有助于提升客户端和服务器端的性能。
# 这个配置控制一个批次的默认大小(单位 byte)。16384是缺省的配置
spring.kafka.producer.batch-size=16384

# 发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka,33554432是缺省配置
spring.kafka.producer.buffer-memory=33554432

# 发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
spring.kafka.producer.properties.linger.ms=1000

# KafkaProducer.send() 和 partitionsFor() 方法的最长阻塞时间 单位 ms
spring.kafka.producer.properties.max.block.ms=6000

# 关键字的序列化类
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer

# 值的序列化类
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

5、生产者发消息的工具类


import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.lang.NonNull;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

/**
 * 生产者发送消息工具类
 *
 * @ClassName KafkaSenderUtils
 * @Author destiny
 * @Date 2023/4/21 11:04
 */
@Component
@Slf4j
public class KafkaSenderUtils {
    private final KafkaTemplate<String, String> kafkaTemplate;

    /**
     * 构造器方式注入 kafkaTemplate
     */
    public KafkaSenderUtils(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void send(String topicName, String msg) {
        try {

            ListenableFuture<SendResult<String, String>> listenableFuture = kafkaTemplate.send(topicName, msg);
            listenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
                @Override
                public void onSuccess(SendResult<String, String> result) {
                    log.info("发送成功回调:{}", JSONUtil.toJsonStr(result.getProducerRecord().value()));
                }

                @Override
                public void onFailure(@NonNull Throwable ex) {
                    log.error(">>>>失败原因:{}", ex.getMessage());
                    log.info("发送失败回调");
                }
            });
        } catch (Exception e) {
            log.info("发送异常");
            e.printStackTrace();
        }

    }
}

6、消费着配置

# consumer 消费端的配置,需要给consumer配置一个group-id
spring.kafka.consumer.group-id=test
# key的编解码方法
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# value的编解码方法
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#消费方式: 在有提交记录的时候,earliest与latest是一样的,从提交记录的下一条开始消费
# earliest:无提交记录,从头开始消费
# latest:无提交记录,从最新的消息下一条开始消费
spring.kafka.consumer.auto-offset-reset=latest
# 是否自动提交偏移量offset 默认 true
spring.kafka.consumer.enable-auto-commit=false
# 自动提交的频率。前提是 enable-auto-commit=true 单位 ms
spring.kafka.consumer.auto-commit-interval=100ms
# 一次调用poll()返回的最大记录数,默认是500(批量消费的数量)
spring.kafka.consumer.max-poll-records=5
# session超时,超过这个时间consumer没有发送心跳,就会触发rebalance操作
spring.kafka.consumer.properties.session.timeout.ms=120000
# 请求超时 单位 ms
spring.kafka.consumer.properties.request.timeout.ms=120000

7、消费者配置类(配置批量消费)


import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;

import java.util.HashMap;
import java.util.Map;

/**
 * 卡夫卡消费者配置
 *
 * @ClassName KafkaConsumerConfig
 * @Author destiny
 * @Date 2023/2/2 18:53
 */
@Slf4j
@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    /**
     * kafka 集群,broker-list
     */
    @Value("${spring.kafka.bootstrap-servers}")
    private String servers;
    /**
     * 开启自动提交
     */
    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private boolean enableAutoCommit;

    /**
     * 消费者组
     */
    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;
    /**
     * 重置消费者的offset
     */
    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;

    /**
     * 批量拉取个数
     */
    @Value("${spring.kafka.consumer.max-poll-records}")
    private int maxPollRecords;
    @Value("${spring.kafka.consumer.properties.session.timeout.ms}")
    private String sessionTimeout;

    @Value("${spring.kafka.consumer.properties.request.timeout.ms}")
    private String requestTimeout;

   
    /**
     * 卡夫卡侦听器容器工厂
     *
     * @return {@link KafkaListenerContainerFactory}<{@link ConcurrentMessageListenerContainer}<{@link String}, {@link String}>>
     */
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        // 设置 consumerFactory
        factory.setConsumerFactory(consumerFactory());
        // 设置消费者组中的线程数量
        factory.setConcurrency(3);
        // 设置轮询超时
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    /**
     * 消费者工厂
     *
     * @return {@link ConsumerFactory}<{@link Integer}, {@link String}>
     */
    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    /**
     * 消费者配置
     *
     * @return {@link Map}<{@link String}, {@link Object}>
     */
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        // 自动提交 offset 默认 true
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        // 自动提交的频率 单位 ms
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
        // 批量消费最大数量
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        // 消费者组
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        // session超时,超过这个时间consumer没有发送心跳,就会触发rebalance操作
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
        // 请求超时
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeout);
        // Key 反序列化类
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // Value 反序列化类
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 当kafka中没有初始offset或offset超出范围时将自动重置offset
        // earliest:重置为分区中最小的offset
        // latest:重置为分区中最新的offset(消费分区中新产生的数据)
        // none:只要有一个分区不存在已提交的offset,就抛出异常
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);

       
        return props;
    }

    /**
     * kafka批量监听
     *
     * @return {@link KafkaListenerContainerFactory}<{@link ConcurrentMessageListenerContainer}<{@link Integer}, {@link String}>>
     */
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> batchFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        // 设置 consumerFactory
        factory.setConsumerFactory(consumerFactory());
        // 设置是否开启批量监听
        factory.setBatchListener(true);
        // 设置消费者组中的线程数量
        factory.setConcurrency(3);

        return factory;
    }

    /**
     * 消费异常处理器
     *
     * @return {@link ConsumerAwareListenerErrorHandler}
     */
    @Bean
    public ConsumerAwareListenerErrorHandler consumerAwareListenerErrorHandler() {
        return (message, exception, consumer) -> {
            // 打印消费异常的消息和异常信息
            log.error("consumer failed! message: {}, exceptionMsg: {}, groupId: {}", message, exception.getMessage(), exception.getGroupId());
            return exception;
        };
    }
}

8、测试类分别测试单条消费以及批量消费


import cn.hutool.json.JSONUtil;
import com.google.common.collect.Maps;
import com.xiaoju.framework.entity.common.ResultMap;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.HashMap;

/**
 * @ClassName KafkaDemo
 * @Author destiny
 * @Date 2023/2/1 16:32
 */
@RestController
@Slf4j
@RequestMapping(value = {"/api/kafka"})
public class KafkaDemo {

    @Resource
    private KafkaSenderUtils kafkaSenderUtils;

    @GetMapping(value = {"/test"})
    public void test() {
        // 单条消费测试
        String message = "message";
        kafkaSenderUtils.send("testSingle", message);
    }

    @GetMapping(value = {"/testBatch"})
    public void testBatch() {
        // 批量消费测试
        String message = "message";
        for (int i = 1; i <= 20; i++) {
            HashMap<String, Object> map = Maps.newHashMap();
            map.put("id", i);
            map.put("message", i + ":" + message + System.currentTimeMillis());
            kafkaSenderUtils.send("batchTest", JSONUtil.toJsonStr(map));
        }
    }

}

9、消费者消费

package cn.ctyuncdn.consumer.kafka;

import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import javax.annotation.Resource;
import java.util.List;
import java.util.stream.Collectors;

/**
 * Kafka消费者
 *
 * @author destiny
 * @date 2023/02/06
 */
@Component
@Slf4j
public class KafkaConsumer {

    // 单条消费
    @KafkaListener(id = "testSingle", topics = {"testSingle"}, groupId = "${spring.kafka.consumer.group-id}")
    public void testSingle(ConsumerRecord<String, String> record) {
        String topic = record.topic();
        String msg = record.value();
        log.info("消费者接受消息:topic-->" + topic + ",msg->>" + msg);
    }

    // 批量消费
    @KafkaListener(id = "batchTest", topics = {"batchTest"}, groupId = "${spring.kafka.consumer.group-id}", containerFactory = "batchFactory")
    public void batchTest(List<ConsumerRecord<String, String>> records) {
        log.info(">>>consumer batch size ===>>{}", records.size());
        for (ConsumerRecord<String, String> record : records) {
            String topic = record.topic();
            String msg = record.value();
            log.info("消费者接受消息:topic-->" + topic + ",msg->>" + msg);
        }
    }

}

完结。。。文章来源地址https://www.toymoban.com/news/detail-520660.html

到了这里,关于java 集成kafka(支持单条消费和批量消费)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 分布式 - 消息队列Kafka:Kafka 消费者消息消费与参数配置

    01. 创建消费者 在读取消息之前,需要先创建一个KafkaConsumer对象。创建KafkaConsumer对象与创建KafkaProducer对象非常相似——把想要传给消费者的属性放在Properties对象里。 为简单起见,这里只提供4个必要的属性:bootstrap.servers、key.deserializer 和 value.deserializer。 ① bootstrap.servers 指

    2024年02月12日
    浏览(31)
  • 分布式 - 消息队列Kafka:Kafka 消费者消费位移的提交方式

    最简单的提交方式是让消费者自动提交偏移量,自动提交 offset 的相关参数: enable.auto.commit:是否开启自动提交 offset 功能,默认为 true; auto.commit.interval.ms:自动提交 offset 的时间间隔,默认为5秒; 如果 enable.auto.commit 被设置为true,那么每过5秒,消费者就会自动提交 poll() 返

    2024年02月12日
    浏览(32)
  • kafka原理五之springboot 集成批量消费

    目录 前言 一、新建一个maven工程,添加kafka依赖 二、yaml配置文件 三、消息消费 手动提交非批量消费   String 类型接入 使用注解方式获取消息头、消息体 手动提交批量消费 ConsumerRecord类接收 String类接收 使用注解方式获取消息头、消息体,则也是使用 List 来接收: 并发消费

    2024年02月14日
    浏览(30)
  • 分布式消息队列Kafka(四)- 消费者

    1.Kafka消费方式 2.Kafka消费者工作流程 (1)总体工作流程 (2)消费者组工作流程 3.消费者API (1)单个消费者消费 实现代码 (2)单个消费者指定分区消费 代码实现: (3)消费者组消费 复制上面CustomConsumer三个,同时去订阅统一个主题,消费数据,发现一个分区只能被一个

    2023年04月26日
    浏览(36)
  • 分布式 - 消息队列Kafka:Kafka消费者的分区分配策略

    Kafka 消费者负载均衡策略? Kafka 消费者分区分配策略? 1. 环境准备 创建主题 test 有5个分区,准备 3 个消费者并进行消费,观察消费分配情况。然后再停止其中一个消费者,再次观察消费分配情况。 ① 创建主题 test,该主题有5个分区,2个副本: ② 创建3个消费者CustomConsu

    2024年02月13日
    浏览(32)
  • 分布式 - 消息队列Kafka:Kafka消费者分区再均衡(Rebalance)

    01. Kafka 消费者分区再均衡是什么? 消费者群组里的消费者共享主题分区的所有权。当一个新消费者加入群组时,它将开始读取一部分原本由其他消费者读取的消息。当一个消费者被关闭或发生崩溃时,它将离开群组,原本由它读取的分区将由群组里的其他消费者读取。 分区

    2024年02月12日
    浏览(27)
  • kafka 分布式的情况下,如何保证消息的顺序消费?

    目录 一、什么是分布式 二、kafka介绍 三、消息的顺序消费 四、如何保证消息的顺序消费   分布式是指将计算任务分散到多个计算节点上进行并行处理的一种计算模型。在分布式系统中,多台计算机通过网络互联,共同协作完成任务。每个计算节点都可以独立运行,并且可以

    2024年02月10日
    浏览(42)
  • 分布式环境集成JWT(Java Web Token)

    Token的引入:客户端向服务端请求数据时一般都会加入验证信息,比如客户端在请求的信息中携带用户名、密码,服务端会校验用户名和密码是否正确,校验通过响应该客户端请求。但是每次都携带用户名和密码无疑有些繁琐,而且也不安全,在这种背景下,Token便应运而生。

    2024年02月11日
    浏览(34)
  • Kafka 批量消费

    业务背景 项目有个需求需要统计IM聊天相关数据,原设计思想是在聊天产生时通过消息队列进行数据记录,利用rocketMQ实现。上线后发现由于内部IM活跃用户量级较大,MQ生产者生产消息过多,消费者实时消费会造成服务器CPU和硬盘读写压力,在改不了硬件配置的情况下,笔者

    2024年02月11日
    浏览(23)
  • springboot集成kafka消费数据

    1.1.1.生产者分区策略 轮询策略:Round-robin 策略,即顺序分配,轮询策略有非常优秀的负载均衡表 现,它总是能保证消息最大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略。(默认、常用) 随机策略:Randomness 策略。所谓随机就是我们随意地将消息放

    2024年01月19日
    浏览(32)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包