Springboot Kafka 集成配置

这篇具有很好参考价值的文章主要介绍了Springboot Kafka 集成配置。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Springboot 配置使用 Kafka
前言
一、Linux 安装 Kafka
二、构建项目
三、引入依赖
四、配置文件
生产者
yml 方式
Config 方式
消费者
yml 方式
Config 方式
五、开始写代码
生产者
发送
成功回调和异常处理
消费者
接收
异常处理
七、开始测试
测试普通单条消息
测试消费者异常处理
测试延时消息
测试批量消息
测试手动控制消费者监听
总结
前言
不多BB讲原理,只教你怎么用,看了全网没有比我更详细的了,yml 配置,Config 工厂代码配置都有,batch-size、acks、offset、auto-commit、trusted-packages、poll-timeout、linger 应有尽有,批量消费、开启事务、定义批量消费数量、延时发送、失败重试、异常处理你还想要什么

As we all know,当今世界最流行的消息中间件有 RabbitMq、RocketMq、Kafka,其中,应用最广泛的是 RabbitMq,RocketMq 是阿里巴巴的产品,性能超过 RabbitMq,已经经受了多年的双11考验,但是怕哪天阿里不维护了,用的人不多,Kafka 是吞吐量最大的一个,远超前两个,支持事务、可保证消息的不丢失(网上说的事务和消息可靠性不支持是说的旧版,2以后就开始支持了),对比来讲,Kafka相对于前两个,只有一个劣势,不太支持延时队列,其他方面都要优于它们(个人使用体验,勿喷)。

一、Linux 安装 Kafka
我的另一篇文章:Debian(Linux通用)安装 Kafka 并配置远程访问

二、构建项目
多模块项目构建,这里不讲,如果你不会,就新建两个普通的web项目 KafkaConsumer和 KafkaProvider就行

三、引入依赖
新建一个标准的spring-web项目,额外依赖真的只需要这一个,网上说的 kafka-client 不是springboot 的东西,那就是个原生的 kafka 客户端, kafka-test也不需要,这个是用代码控制broker的东西

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

四、配置文件
这两种方式的代码会互相覆盖,而且有些配置只能用 config 方式配置,建议像我一样,两种都写,config里面的配置参数从 yml 中获取,就可以不影响使用 Nacos 来在线修改 kafka 的配置了

生产者
配置的意思详解在注释里面都有哦

yml 方式

server:
  port: 8081
spring:
  kafka:
    producer:
      # Kafka服务器
      bootstrap-servers: 175.24.228.202:9092
      # 开启事务,必须在开启了事务的方法中发送,否则报错
      transaction-id-prefix: kafkaTx-
      # 发生错误后,消息重发的次数,开启事务必须设置大于0。
      retries: 3
      # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
      # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
      # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
      # 开启事务时,必须设置为all
      acks: all
      # 当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
      batch-size: 16384
      # 生产者内存缓冲区的大小。
      buffer-memory: 1024000
      # 键的序列化方式
      key-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      # 值的序列化方式(建议使用Json,这种序列化方式可以无需额外配置传输实体类)
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

Config 方式

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.kafka.transaction.KafkaTransactionManager;

import java.util.HashMap;
import java.util.Map;

/**
 * @author 
 * @date 2022/10/31 18:05
 * kafka配置,也可以写在yml,这个文件会覆盖yml
 */
@SpringBootConfiguration
public class KafkaProviderConfig {
   

    @Value("${spring.kafka.producer.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.producer.transaction-id-prefix}")
    private String transactionIdPrefix;
    @Value("${spring.kafka.producer.acks}")
    private String acks;
    @Value("${spring.kafka.producer.retries}")
    private String retries;
    @Value("${spring.kafka.producer.batch-size}")
    private String batchSize;
    @Value("${spring.kafka.producer.buffer-memory}")
    private String bufferMemory;

    @Bean
    public Map<String, Object> producerConfigs() {
   
        Map<String, Object> props = new HashMap<>(16);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        //acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
        //acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
        //acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
        //开启事务必须设为all
        props.put(ProducerConfig.ACKS_CONFIG, acks);
        //发生错误后,消息重发的次数,开启事务必须大于0
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        //当多个消息发送到相同分区时,生产者会将消息打包到一起,以减少请求交互. 而不是一条条发送
        //批次的大小可以通过batch.size 参数设置.默认是16KB
        //较小的批次大小有可能降低吞吐量(批次大小为0则完全禁用批处理)。
        //比如说,kafka里的消息5秒钟Batch才凑满了16KB,才能发送出去。那这些消息的延迟就是5秒钟
        //实测batchSize这个参数没有用
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        //有的时刻消息比较少,过了很久,比如5min也没有凑够16KB,这样延时就很大,所以需要一个参数. 再设置一个时间,到了这个时间,
        //即使数据没达到16KB,也将这个批次发送出去
        props.put(ProducerConfig.LINGER_MS_CONFIG, "5000");
        //生产者内存缓冲区的大小
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        //反序列化,和生产者的序列化方式对应
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<Object, Object> producerFactory() {
   
        DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(producerConfigs());
        //开启事务,会导致 LINGER_MS_CONFIG 配置失效
        factory.setTransactionIdPrefix(transactionIdPrefix);
        return factory;
    }

    @Bean
    public KafkaTransactionManager<Object, Object> kafkaTransactionManager(ProducerFactory<Object, Object> producerFactory) {
   
        return new KafkaTransactionManager<>(producerFactory);
    }

    @Bean
    public KafkaTemplate<Object, Object> kafkaTemplate() {
   
        return new KafkaTemplate<>(producerFactory());
    }
}

消费者
yml 方式

server:
  port: 8082
spring:
  kafka:
    consumer:
      # Kafka服务器
      bootstrap-servers: 175.24.228.202:9092
      group-id: firstGroup
      # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
      #auto-commit-interval: 2s
      # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
      # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费分区的记录
      # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据(在消费者启动之后生成的记录)
      # none:当各分区都存在已提交的offset时,从提交的offset开始消费;只要有一个分区不存在已提交的offset,则抛出异常
      auto-offset-reset: latest
      # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
      enable-auto-commit: false
      # 键的反序列化方式
      #key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      key-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      # 值的反序列化方式(建议使用Json,这种序列化方式可以无需额外配置传输实体类)
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      # 配置消费者的 Json 反序列化的可信赖包,反序列化实体类需要
      properties:
        spring:
          json:
            trusted:
              packages: "*"
      # 这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。如果在拉取消息的时候新消息不足500条,那有多少返回多少;如果超过500条,每次只返回500。
      # 这个默认值在有些场景下太大,有些场景很难保证能够在5min内处理完500条消息,
      # 如果消费者无法在5分钟内处理完500条消息的话就会触发reBalance,
      # 然后这批消息会被分配到另一个消费者中,还是会处理不完,这样这批消息就永远也处理不完。
      # 要避免出现上述问题,提前评估好处理一条消息最长需要多少时间,然后覆盖默认的max.poll.records参数
      # 注:需要开启BatchListener批量监听才会生效,如果不开启BatchListener则不会出现reBalance情况
      max-poll-records: 3
    properties:
      # 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance
      max:
        poll:
          interval:
            ms: 600000
      # 当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10s
      session:
        timeout:
          ms: 10000
    listener:
      # 在侦听器容器中运行的线程数,一般设置为 机器数*分区数
      concurrency: 4
      # 自动提交关闭,需要设置手动消息确认
      ack-mode: manual_immediate
      # 消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误
      missing-topics-fatal: false
      # 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance
      poll-timeout: 600000

Config 方式

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.support.serializer.JsonDeserializer;

import java.util.HashMap;
import java.util.Map;

/**
 * @author
 * @date 2022/10/31 18:05
 * kafka配置,也可以写在yml,这个文件会覆盖yml
 */
@SpringBootConfiguration
public class KafkaConsumerConfig {
   

    @Value("${spring.kafka.consumer.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;
    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private boolean enableAutoCommit;
    @Value("${spring.kafka.properties.session.timeout.ms}")
    private String sessionTimeout;
    @Value("${spring.kafka.properties.max.poll.interval.ms}")
    private String maxPollIntervalTime;
    @Value("${spring.kafka.consumer.max-poll-records}")
    private String maxPollRecords;
    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;
    @Value("${spring.kafka.listener.concurrency}")
    private Integer concurrency;
    @Value("${spring.kafka.listener.missing-topics-fatal}")
    private boolean missingTopicsFatal;
    @Value("${spring.kafka.listener.poll-timeout}")
    private long pollTimeout;

    @Bean
    public Map<String, Object> consumerConfigs() {
   
        Map<String, Object> propsMap = new HashMap<>(16);
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        //是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        //自动提交的时间间隔,自动提交开启时生效
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000");
        //该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
        //earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费分区的记录
        //latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据(在消费者启动之后生成的记录)
        //none:当各分区都存在已提交的offset时,从提交的offset开始消费;只要有一个分区不存在已提交的offset,则抛出异常
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        //两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance
        propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalTime);
        //这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。如果在拉取消息的时候新消息不足500条,那有多少返回多少;如果超过500条,每次只返回500。
        //这个默认值在有些场景下太大,有些场景很难保证能够在5min内处理完500条消息,
        //如果消费者无法在5分钟内处理完500条消息的话就会触发reBalance,
        //然后这批消息会被分配到另一个消费者中,还是会处理不完,这样这批消息就永远也处理不完。
        //要避免出现上述问题,提前评估好处理一条消息最长需要多少时间,然后覆盖默认的max.poll.records参数
        //注:需要开启BatchListener批量监听才会生效,如果不开启BatchListener则不会出现reBalance情况
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        //当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10s
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
        //序列化(建议使用Json,这种序列化方式可以无需额外配置传输实体类)
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        return propsMap;
    }

    @Bean
    public ConsumerFactory<Object, Object> consumerFactory() {
   
        //配置消费者的 Json 反序列化的可信赖包,反序列化实体类需要
        try(JsonDeserializer<Object> deserializer = new JsonDeserializer<>()) {
   
            deserializer.trustedPackages("*");
            return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new JsonDeserializer<>(), deserializer);
        }
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Object, Object>> kafkaListenerContainerFactory() {
   
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        //在侦听器容器中运行的线程数,一般设置为 机器数*分区数
        factory.setConcurrency(concurrency);
        //消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误
        factory.setMissingTopicsFatal(missingTopicsFatal);
        //自动提交关闭,需要设置手动消息确认
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.getContainerProperties().setPollTimeout(pollTimeout);
        //设置为批量监听,需要用List接收
        //factory.setBatchListener(true);
        return factory;
    }
}

五、开始写代码
下面我们开始写 Kafka 的消息发送代码

生产者
发送
KafkaController用于发送消息到 Kafka

import icu.xuyijie.provider.entity.User;
import icu.xuyijie.provider.handler.KafkaSendResultHandler;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * @author 
 * @date 2022/10/31 14:05
 * kafka发送消息
 */
@RestController
@RequestMapping("/provider")
//这个注解代表这个类开启Springboot事务,因为我们在Kafka的配置文件开启了Kafka事务,不然会报错
@Transactional(rollbackFor = RuntimeException.class)
public class KafkaController {
   

    private final KafkaTemplate<Object, Object> kafkaTemplate;

    public KafkaController(KafkaTemplate<Object, Object> kafkaTemplate, KafkaSendResultHandler kafkaSendResultHandler) {
   
        this.kafkaTemplate = kafkaTemplate;
        //回调方法、异常处理
        this.kafkaTemplate.setProducerListener(kafkaSendResultHandler);
    }

    @RequestMapping("/sendMultiple")
    public void sendMultiple() {
   
        String message = "发送到Kafka的消息";
        for (int i = 0;i < 10;i++) {
   
            kafkaTemplate.send("topic1", "发送到Kafka的消息" + i);
            System.out.println(message + i);
        }
    }

    @RequestMapping("/send")
    public void send() {
   
    	//这个User的代码我没放出来,自己随便写一个实体类,实体类一定要 implements Serializable
        User user = new User(1, "jaxon");
        kafkaTemplate.send("topic1", user);
        kafkaTemplate.send("topic2", "发给topic2");
    }

	/**
     * Kafka提供了多种构建消息的方式
     * @throws ExecutionException
     * @throws InterruptedException
     * @throws TimeoutException
     */
    public void SendDemo() throws ExecutionException, InterruptedException, TimeoutException {
   
        //后面的get代表同步发送,括号内时间可选,代表超过这个时间会抛出超时异常,但是仍会发送成功
        kafkaTemplate.send("topic1", "发给topic1").get(1, TimeUnit.MILLISECONDS);

        //使用ProducerRecord发送消息
        ProducerRecord<Object, Object> producerRecord = new ProducerRecord<>("topic.quick.demo", "use ProducerRecord to send message");
        kafkaTemplate.send(producerRecord);

        //使用Message发送消息
        Map<String, Object> map = new HashMap<>();
        map.put(KafkaHeaders.TOPIC, "topic.quick.demo");
        map.put(KafkaHeaders.PARTITION_ID, 0);
        map.put(KafkaHeaders.MESSAGE_KEY, 0);
        GenericMessage<Object> message = new GenericMessage<>("use Message to send message", new MessageHeaders(map));
        kafkaTemplate.send(message);
    }
}

成功回调和异常处理
KafkaSendResultHandler

import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.lang.Nullable;
import org.springframework.stereotype.Component;

/**
 * @author 
 * @date 2022/10/31 15:41
 * kafka消息发送回调
 */
@Component
public class KafkaSendResultHandler implements ProducerListener<Object, Object> {
   

    @Override
    public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
   
        System.out.println("消息发送成功:" + producerRecord.toString());
    }

    @Override
    public void onError(ProducerRecord producerRecord, @Nullable RecordMetadata recordMetadata, Exception exception) {
   
        System.out.println("消息发送失败:" + producerRecord.toString() + exception.getMessage());
    }
}

消费者
接收
KafkaHandler用于接收 Kafka 里的消息

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.List;
import java.util.Objects;

/**
 * @author 
 * @date 2022/10/31 14:04
 * kafka监听消息
 */
@RestController
public class KafkaHandler {
   

    private final KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    public KafkaHandler(KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry) {
   
        this.kafkaListenerEndpointRegistry = kafkaListenerEndpointRegistry;
    }

    /**
     * 监听kafka消息
     *
     * @param consumerRecord kafka的消息,用consumerRecord可以接收到更详细的信息,也可以用String message只接收消息
     * @param ack  kafka的消息确认
     * 使用autoStartup = "false"必须指定id
     */
    @KafkaListener(topics = {
   "topic1", "topic2"}, errorHandler = "myKafkaListenerErrorHandler")
//    @KafkaListener(id = "${spring.kafka.consumer.group-id}", topics = {"topic1", "topic2"}, autoStartup = "false")
    public void listen1(ConsumerRecord<Object, Objects> consumerRecord, Acknowledgment ack) {
   
        try {
   
            //用于测试异常处理
            //int i = 1 / 0;
            System.out.println(consumerRecord.value());
        } finally {
   
            //手动确认
            ack.acknowledge();
        }
    }

    /**
     * 下面的方法可以手动操控kafka的队列监听情况
     * 先发送一条消息,因为autoStartup = "false",所以并不会看到有消息进入监听器。
     * 接着启动监听器,/start/webGroup。可以看到有一条消息进来了。
     * pause是暂停监听,resume是继续监听
     *
     * @param listenerId consumer的group-id
     */
    @RequestMapping("/pause/{listenerId}")
    public void stop(@PathVariable String listenerId) {
   
        Objects.requireNonNull(kafkaListenerEndpointRegistry.getListenerContainer(listenerId)).pause();
    }

    @RequestMapping("/resume/{listenerId}")
    public void resume(@PathVariable String listenerId) {
   
        Objects.requireNonNull(kafkaListenerEndpointRegistry.getListenerContainer(listenerId)).resume();
    }

    @RequestMapping("/start/{listenerId}")
    public void start(@PathVariable String listenerId) {
   
        Objects.requireNonNull(kafkaListenerEndpointRegistry.getListenerContainer(listenerId)).start();
    }
}

异常处理
MyKafkaListenerErrorHandler

import org.apache.kafka.clients.consumer.Consumer;
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.lang.NonNull;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

/**
 * @author
 * @date 2022/10/31 15:27
 * 异常处理
 */
@Component
public class MyKafkaListenerErrorHandler implements KafkaListenerErrorHandler {
   

    @Override
    @NonNull
    public Object handleError(@NonNull Message<?> message, @NonNull ListenerExecutionFailedException exception) {
   
        return new Object();
    }

    @Override
    @NonNull
    public Object handleError(@NonNull Message<?> message, @NonNull ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
   
        System.out.println("消息详情:" + message);
        System.out.println("异常信息::" + exception);
        System.out.println("消费者详情::" + consumer.groupMetadata());
        System.out.println("监听主题::" + consumer.listTopics());
        return KafkaListenerErrorHandler.super.handleError(message, exception, consumer);
    }
}

七、开始测试
启动生产者和消费者,消费者控制台打印出我配置的 group-id webGroup id就是启动成功了,如果启动报错不会解决,可以评论区留言

测试普通单条消息
浏览器访问 http://127.0.0.1:8081/provider/send 来调用生产者发送一条消息,生产者控制台打印出回调,消费者控制台输出接收到的消息

测试消费者异常处理
把消费者里的 listen1 方法里的这行代码取消注释

//用于测试异常处理
int i = 1 / 0;
重启消费者,访问 http://127.0.0.1:8081/provider/send ,发现消费者虽然报错但是没有抛出异常,而是被我们处理了

测试延时消息
发送延时消息要关闭事务,在生产者的 yml 和 config 配置文件里把下面代码注释掉

开启事务,必须在开启了事务的方法中发送,否则报错

transaction-id-prefix: kafkaTx-

//开启事务,会导致 LINGER_MS_CONFIG 配置失效
//factory.setTransactionIdPrefix(transactionIdPrefix);
然后重新请求http://127.0.0.1:8081/provider/send,发现 5s 后消息发出,配置延迟时间的配置是props.put(ProducerConfig.LINGER_MS_CONFIG, “5000”);,其实这个不是真正的延时消息,Kafka实现真正的延时消息要使用JDK的DelayQueue手动实现。

测试批量消息
打开消费者的 config 配置里 setBatchListener 这一行代码,我们定义的 MAX_POLL_RECORDS_CONFIG 为3,即每次批量读取3条消息,批量监听需要用List接收,listen1方法的参数加一个List包起来

//设置为批量监听,需要用List接收

factory.setBatchListener(true);
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
public void listen1(List<ConsumerRecord<Object, Objects>> consumerRecord, Acknowledgment ack)

注意!!!Debug消费者,因为我们要打断点观察每次接收的条数
调用消费者接口http://127.0.0.1:8081/provider/sendMultiple批量发送10条,可以看到消费者每次只接收3条

测试手动控制消费者监听
@KafkaListener这样写,id 和 autoStartup 是关键

@KafkaListener(id = "${spring.kafka.consumer.group-id}", topics = {
   "topic1", "topic2"}, autoStartup = "false")

重启消费者,调用生产者接口http://127.0.0.1:8081/provider/send,我们发现这次消费者没有接收到消息,因为我们关闭了 autoStartup

要开始接收的话,调用消费者接口http://127.0.0.1:8082/start/firstGroup,这个方法可以启动 group-id 为 firstGroup 的 @KafkaListener,然后我们发现消费者控制台接收到消息

http://127.0.0.1:8082/pause/firstGroup暂停接收
http://127.0.0.1:8082/resume/firstGroup恢复接收文章来源地址https://www.toymoban.com/news/detail-720174.html

到了这里,关于Springboot Kafka 集成配置的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • kafka:java集成 kafka(springboot集成、客户端集成)

    摘要 对于java的kafka集成,一般选用springboot集成kafka,但可能由于对接方kafka老旧、kafka不安全等问题导致kafak版本与spring版本不兼容,这个时候就得自己根据kafka客户端api集成了。 一、springboot集成kafka 具体官方文档地址:https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/

    2023年04月22日
    浏览(62)
  • Kafka:springboot集成kafka收发消息

    kafka环境搭建参考Kafka:安装和配置_moreCalm的博客-CSDN博客 1、springboot中引入kafka依赖 2、配置application.yml 传递String类型的消息 3、controller实现消息发送接口 4、component中实现接收类HelloListener  5、测试 浏览器访问该接口并查看控制台         接收成功   传递对象类型的消息

    2024年02月13日
    浏览(43)
  • 八、SpringBoot集成Kafka

    1、修改 SpringBoot 核心配置文件 application.propeties, 添加生产者相关信息 2、创建 controller 从浏览器接收数据, 并写入指定的 topic 1、修改 SpringBoot 核心配置文件 application.propeti 2、创建类消费 Kafka 中指定 topic 的数据 一些配置总结

    2024年02月10日
    浏览(38)
  • SpringBoot——集成Kafka详解

    1.1、引入依赖 1.2、application.yml配置 1.3、简单生产 1.4、简单消费 2.1、带回调的生产者 kafkaTemplate提供了一个回调方法addCallback,我们可以在回调方法中监控消息是否发送成功 或 失败时做补偿处理,有两种写法, 2.2、监听器 Kafka提供了ProducerListener 监听器来异步监听生产者消息

    2024年02月08日
    浏览(45)
  • Kafka集成springboot

    安装kafka,直接到官网下载bin文件,本文使用windows进行使用kafka。 下载之后,第一步,启动zookeeper: zookeeper-server-start.bat ....configzookeeper.properties  第二步,启动kafka: kafka-server-start.bat ....configserver.properties  第三步,在pom中导入依赖: 第四步,修改yml文件,添加配置:

    2024年02月04日
    浏览(36)
  • Springboot 集成kafka

    一、创建项目并导入pom依赖 二、修改application.yml配置 1. producer 生产端的配置 2. consumer 消费端的配置,需要给consumer配置一个group-id 三、生产者生产消息,消费者消费消息 1. 简单消费 producer生产者中使用自动注入的方式创建KafkaTemplate 对象 consumer消费消息,使用@KafkaListener注解

    2024年02月09日
    浏览(33)
  • SpringBoot集成kafka全面实战

    本文是SpringBoot+Kafka的实战讲解,如果对kafka的架构原理还不了解的读者,建议先看一下《大白话kafka架构原理》、《秒懂kafka HA(高可用)》两篇文章。 一、生产者实践 普通生产者 带回调的生产者 自定义分区器 kafka事务提交 二、消费者实践 简单消费 指定topic、partition、of

    2024年02月15日
    浏览(45)
  • SpringBoot 集成 Kafka 高级实现

    1、简介         之前博客中记录了直接使用Kafka客户端实现生产者和消费者之间的交互,这种方式通过设置各种参数编码繁琐,因此通过SpringBoot集成Kafka成为一种常用的实现,下面就详细介绍 SpringBoot 是如何和Kafka进行集成的,本文主要参考官网进行学习(Messaging)。 2、引

    2024年01月16日
    浏览(45)
  • SpringBoot3集成Kafka

    目录 一、简介 二、环境搭建 1、Kafka部署 2、Kafka测试 3、可视化工具 三、工程搭建 1、工程结构 2、依赖管理 3、配置文件 四、基础用法 1、消息生产 2、消息消费 五、参考源码 标签:Kafka3.Kafka-eagle3; Kafka是一个开源的分布式事件流平台,常被用于高性能数据管道、流分析、

    2024年02月12日
    浏览(38)
  • springboot集成kafka消费数据

    1.1.1.生产者分区策略 轮询策略:Round-robin 策略,即顺序分配,轮询策略有非常优秀的负载均衡表 现,它总是能保证消息最大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略。(默认、常用) 随机策略:Randomness 策略。所谓随机就是我们随意地将消息放

    2024年01月19日
    浏览(41)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包