SpringBoot 集成 Kafka 配置

这篇具有很好参考价值的文章主要介绍了SpringBoot 集成 Kafka 配置。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

原生模式

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.0.0</version>
        </dependency>

自定义分区器

/**
 * 自定义分区器
 *
 * @Author: chen yang
 * @Date: 2023/5/7 11:34
 */
public class CustomerPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

        List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic);
        int partition = 2;
        return partitionInfos.size() % partition;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

生产者

/**
 * @Author: chen yang
 * @Date: 2023/5/6 21:59
 */
public class Producer {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        Properties properties = new Properties();
        // 连接集群
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        // 序列化类型
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 添加自定义分区器
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomerPartitioner.class.getName());


        // batch.size:批次大小,默认 16K
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        // linger.ms:等待时间,默认 0
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        // RecordAccumulator:缓冲区大小,默认 32M:buffer.memory
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        // compression.type:压缩,默认 none,可配置值 gzip、snappy、lz4 和 zstd
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");


        // 设置 ack
        // 0:不需要等待数据落盘应答,1:需要等 leader 落盘应答,-1(all):需要等 leader 和所有的 follower(isr队列) 落盘应答
        // type: String, valid values [all, -1, 0, 1], default: all
        properties.put(ProducerConfig.ACKS_CONFIG, "all");

        // 设置重试次数,默认为 int 最大值
        properties.put(ProducerConfig.RETRIES_CONFIG, 3);

        // 设置事务id
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "default_transactional_id_23");

        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        // 开启事务
        producer.initTransactions();
        producer.beginTransaction();

        try {
            // 异步发送
            producer.send(new ProducerRecord<>("test", "this is async message"));

            producer.send(new ProducerRecord<>("test", "this is a async rollback message!"), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    // 发送消息失败会自动重试,不需要在回调函数中手动重试
                    if (Objects.isNull(e)){
                        System.out.println("result: " + recordMetadata.topic() + ", partitions: " + recordMetadata.partition());
                    }
                }
            });

            // 同步发送,只需要在异步发送的基础上再调用 get() 犯法即可
            producer.send(new ProducerRecord<>("test", 1,"","this is sync message")).get();
            producer.commitTransaction();

        }catch (Exception e){
            producer.abortTransaction();
        }finally {
            producer.close();
        }
    }
}

消费者

/**
 * @Author: chen yang
 * @Date: 2023/7/9 10:37
 */
public class Consumer {

    public static void main(String[] args) {

        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer_01");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties)) {
              // 订阅 topic
//            ArrayList<String> topics = new ArrayList<>();
//            consumer.subscribe(topics);

            // 订阅 topic 下的 partition
            ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
            topicPartitions.add(new TopicPartition("night_topic", 1));
            consumer.assign(topicPartitions);

            // 从指定的 offset 开始消费
//            consumer.seek(new TopicPartition("night_topic", 1), 3);

            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(2));
            consumerRecords.forEach(System.out::println);

            // 手动提交 offset
            consumer.commitAsync();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

KafkaTemplate

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

配置文件

spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      batch-size: 16384
      acks: -1
      retries: 10
      transaction-id-prefix: transaction_05
      buffer-memory: 33554432
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      properties:
        linger:
          ms: 2000
        partitioner:
          class: com.night.config.CustomerPartitionHandler
    consumer:
      group-id: g_01
      enable-auto-commit: false
      auto-offset-reset: latest
      max-poll-records: 500
#      auto-commit-interval: 2000  autoCommit = false
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      properties:
        session:
          timeout:
            ms: 120000 # 消费会话超时时间(超过这个时间 consumer 没有发送心跳,就会触发 rebalance 操作)
        request:
          timeout:
            ms: 18000

    listener:
      missing-topics-fatal: false # consumer listener topics 不存在时,启动项目就会报错
      type: batch

自定义分区器

/**
 * @Author: chen yang
 * @Date: 2023/7/8 11:02
 */
@Component
public class CustomerPartitionHandler implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        if (value.toString().contains("二")){
            return 2;
        }else if (value.toString().contains("一")){
            return 1;
        }else {
            return 0;
        }
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

生产者

@RestController
@RequiredArgsConstructor
public class HelloController {

    private final KafkaTemplate<String, String> kafkaTemplate;

    @GetMapping("/send")
    @Transactional // 配置文件中设置了事务id,那么启用时要加上 该注解或者使用 kafka 事务处理
    public Boolean send(String msg){
        for (int i = 0; i < 10; i++) {
            kafkaTemplate.send("night.topic", null, "night key - " + i, msg + " - " + i).addCallback(success -> {
                // 成功回调
                if (success == null || success.getRecordMetadata() == null){
                    return;
                }
                String topic = success.getRecordMetadata().topic();
                int partition = success.getRecordMetadata().partition();
                long offset = success.getRecordMetadata().offset();
                String key = success.getProducerRecord().key();
                System.out.println("send topic:" + topic +", partition: " + partition + ", key:" + key + ", offset: " + offset);
            }, failure -> {
                // 失败回调
                System.out.println("发送消息失败:" + failure.getMessage());
            });
        }
        return true;
    }
}

消费者配置

消费数据过滤
/**
 * 消费数据过滤
 *
 * @Author: chen yang
 * @Date: 2023/7/8 12:20
 */
@Component
public class ConsumerFilterStrategy implements RecordFilterStrategy<String, String> {
    @Override
    public boolean filter(ConsumerRecord<String, String> consumerRecord) {
        // return true: 丢弃消息
        return consumerRecord.value().contains("无效数据");
    }
}
消费异常处理类
/**
 * 消费异常处理类
 *
 * @Author: chen yang
 * @Date: 2023/7/8 11:55
 */
@Component
public class ConsumerExceptionHandler implements ConsumerAwareListenerErrorHandler {

    @Override
    public Object handleError(Message<?> message, ListenerExecutionFailedException e, Consumer<?, ?> consumer) {
        System.out.println("消费异常:" + message.getPayload() + ", ex: " + e.getMessage());
        return null;
    }
}
消费者配置 
/**
 * @Author: chen yang
 * @Date: 2023/7/8 12:22
 */
@Configuration
@RequiredArgsConstructor
public class KafkaConsumerConfig {

    private final KafkaTemplate<String, String> kafkaTemplate;

    private final ConsumerFilterStrategy consumerFilterStrategy;


    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> filterContainerFactory(){
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        // @SendTo 使用,console exception:  a KafkaTemplate is required to support replies
        factory.setReplyTemplate(kafkaTemplate);

        factory.setConsumerFactory(initConsumerFactory());
        // 设置并发量,小于或等于Topic的分区数,并且要在consumerFactory设置一次拉取的数量
        factory.setConcurrency(1);

        // 设置为批量监听
        factory.setBatchListener(true);

        // 配合RecordFilterStrategy使用,被过滤的信息将被丢弃
        factory.setAckDiscarded(true);
        factory.setRecordFilterStrategy(consumerFilterStrategy);
        return factory;
    }



    @Bean
    public ConsumerFactory<String, String> initConsumerFactory(){
        HashMap<String, Object> configs = new HashMap<>();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer_group_id_02");
        configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        configs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
        configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        configs.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        return new DefaultKafkaConsumerFactory<>(configs);
    }
}

@KafkaListener

@Component
public class HelloListener {

    @KafkaListener(topicPartitions = {
            @TopicPartition(topic = "night.topic", partitions = {"0"})
    }, errorHandler = "consumerExceptionHandler", containerFactory = "filterContainerFactory")
    public void consumer0(List<ConsumerRecord<String, String>> records, Consumer<String, String> consumer){
        System.out.println("first consumer receive list size: " + records.size());
        records.forEach(System.out::println);
        consumer.commitSync();
    }


    @KafkaListener(topicPartitions = {
            @TopicPartition(topic = "night.topic", partitions = {"1"})
    }, errorHandler = "consumerExceptionHandler")
    @SendTo("singleTopic")
    public String consumer1(List<ConsumerRecord<String, String>> records, Consumer<String, String> consumer){
        System.out.println("second consumer receive list size: " + records.size());
        records.forEach(System.out::println);
        consumer.commitSync();
        return "@SendTo annotation msg";
    }


    @KafkaListener(topicPartitions = {
            @TopicPartition(topic = "night.topic", partitions = {"2"})
    }, errorHandler = "consumerExceptionHandler")
    public void consumer2(List<ConsumerRecord<String, String>> records, Consumer<String, String> consumer){
        System.out.println("third consumer receive list size: " + records.size());
        records.forEach(System.out::println);
        consumer.commitSync();
    }
}

文章来源地址https://www.toymoban.com/news/detail-557474.html

到了这里,关于SpringBoot 集成 Kafka 配置的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 在Spring Boot微服务集成Kafka客户端(kafka-clients)操作Kafka

    记录 :459 场景 :在Spring Boot微服务集成Kafka客户端kafka-clients-3.0.0操作Kafka。使用kafka-clients的原生KafkaProducer操作Kafka生产者Producer。使用kafka-clients的原生KafkaConsumer操作Kafka的消费者Consumer。 版本 :JDK 1.8,Spring Boot 2.6.3,kafka_2.12-2.8.0,kafka-clients-3.0.0。 Kafka安装 :https://blog.csdn.ne

    2024年02月12日
    浏览(52)
  • 【Spring Boot】集成Kafka实现消息发送和订阅

    最近忙着搞低代码开发,好久没新建spring项目了,结果今天心血来潮准备建个springboot项目 注意Type选Maven,java选8,其他默认 点下一步后完成就新建了一个spring boot项目,配置下Maven环境,主要是settings.xml文件,里面要包含阿里云仓库,不然可能依赖下载不下来 在maven配置没问

    2024年02月09日
    浏览(49)
  • 从零开始学Spring Boot系列-集成Kafka

    Apache Kafka是一个开源的分布式流处理平台,由LinkedIn公司开发和维护,后来捐赠给了Apache软件基金会。Kafka主要用于构建实时数据管道和流应用。它类似于一个分布式、高吞吐量的发布-订阅消息系统,可以处理消费者网站的所有动作流数据。这种动作流数据包括页面浏览、搜

    2024年03月21日
    浏览(63)
  • kafka--技术文档--spring-boot集成基础简单使用

            查阅了很多资料了解到,使用了spring-boot中整合的kafka的使用是被封装好的。也就是说这些使用其实和在linux中的使用kafka代码的使用其实没有太大关系。但是逻辑是一样的。这点要注意! 核心配置为: 如果在下面规定了spring-boot的版本那么就不需要再使用版本号,如

    2024年02月11日
    浏览(48)
  • 使用Spring Boot集成中间件:Kafka的高级使用案例讲解

    在实际应用中,Kafka作为一种强大的分布式消息系统,广泛应用于实时数据处理和消息传递。本文将通过一个全面的使用案例,详细介绍如何使用Spring Boot集成Kafka,并展示其在实际场景中的应用。 在开始之前,我们需要确保已经完成以下准备工作: 安装并启动Kafka集群 创建

    2024年02月01日
    浏览(54)
  • Springboot Kafka 集成配置

    Springboot 配置使用 Kafka 前言 一、Linux 安装 Kafka 二、构建项目 三、引入依赖 四、配置文件 生产者 yml 方式 Config 方式 消费者 yml 方式 Config 方式 五、开始写代码 生产者 发送 成功回调和异常处理 消费者 接收 异常处理 七、开始测试 测试普通单条消息 测试消费者异常处理 测试

    2024年02月08日
    浏览(47)
  • SpringBoot 集成 Kafka 配置

    自定义分区器 生产者 消费者 配置文件 自定义分区器 生产者 消费者配置 消费数据过滤 消费异常处理类 消费者配置  @KafkaListener

    2024年02月15日
    浏览(37)
  • SpringBoot集成Kafka 配置工具类

    spring-kafka 是基于 java版的 kafka client与spring的集成,提供了 KafkaTemplate,封装了各种方法,方便操作,它封装了apache的kafka-client,不需要再导入client依赖 YML配置 简单工具类,能满足正常使用,主题是无法修改的 发送消息 使用异步 建立主题 如果broker端配置auto.create.topics.enable为

    2024年02月08日
    浏览(38)
  • spring boot配置双Kafka方法

    第一步:application.yml的配置 第二步:配置config 注意!注意!注意!!!代码中的一些字段名自己改一下。xxxx之类的换成自己的就行 第三步: 直接在你要用到的类中直接引用就行。 跟着以上三步走就可以简单的配置两个Kafka了,还有跟高级的spring切面切点的方法作者还没有

    2024年02月11日
    浏览(41)
  • SpringBoot项目集成kafka及常规配置

            使用 spring-kafka 的api,在springboot项目中集成kafka能力,封装配置。 1.1 KafkaConfiguration 公共配置 1.2 KafkaConsumerConfiguration 消费者配置 1.3 KafkaListenerConfiguration 监听配置 1.4 KafkaProducerConfiguration 生产者配置 2.1 ConsumerFactoryBuilder 消费者工厂 2.2 ProducerFactoryBuilder 生产者工

    2024年02月16日
    浏览(41)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包