spring 中kafka的基本使用

这篇具有很好参考价值的文章主要介绍了spring 中kafka的基本使用。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

默认大家都是maven工程

第一步

在pom文件中 引入

<dependencies>
    <dependency>
          <groupId>org.springframework.kafka</groupId>
          <artifactId>spring-kafka</artifactId>
    </dependency>
</dependencies>

在项目中创建多线程消费者的类,因为频繁创建和销毁线程也会有性能消耗,所以先创建线程池

spring-kafka,spring boot,spring,kafka,java

 

package com.adasplus.gps_handler.server;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@Slf4j
@Component
public class KafkaMessageListener{

    private final int coreNum = Runtime.getRuntime().availableProcessors();

    private ThreadPoolExecutor threadPoolExecutor;

    @PostConstruct
    private void initThreadPool() {
        threadPoolExecutor = new ThreadPoolExecutor(coreNum,
                2 * coreNum,
                10,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(10000),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.DiscardOldestPolicy());
    }

    @Autowired
    private GpsInsertWithMultiThreading gpsInsertWithMultiThreading;

    @KafkaListener(topics = {"#{'${spring.kafka.consumer.topics}'.split(',')}"}, containerFactory = "batchFactory")
    public void getMessage(List<ConsumerRecord<String, String>> records) {
        try {
            log.info("consumer kafka data one: {} ", records);
            log.info("consumer kafka data count: {} ", records.size());
            gpsInsertWithMultiThreading.setThreadPoolExecutor(threadPoolExecutor);
            gpsInsertWithMultiThreading.execute(records);
        } catch (Exception e) {
            log.error("handle kafka data error:{} ,stack:{}", e.getMessage(), e.getStackTrace());
        }
    }
}
GpsInsertWithMultiThreading为处理后续业务的中间层

@Slf4j
@Service
@Data
public class GpsInsertWithMultiThreading {

    //线程池
    private ThreadPoolExecutor threadPoolExecutor;
    //从kafka拉取下来的数据
    private List<ConsumerRecord<String, String>> data

    public void execute(List<ConsumerRecord<String, String>> data) {
        try {
            handle(data);
        } catch (Exception e) {
            e.printStackTrace();
            log.error("处理异常,error:{},stack:{}", e.getMessage(), e.getStackTrace());
        }
    }

    public void handle(List<ConsumerRecord<String, String>> data) {
        //这个过程中可以对数据进行格式化,比如将data转为json
        HashMap<String, ArrayList<JSONObject>> gpsMap = Maps.newHashMap();
        
        startThreadPool(gpsMap);
    }

    private void startThreadPool(HashMap<String, ArrayList<JSONObject>> gpsMap) {
        try {
                //GpsHandlerCallable类为具体要执行的业务逻辑,比如写库,更新redis等,需是个线程
                threadPoolExecutor.submit(new GpsHandlerCallable());
            
        } catch (Exception e) {
            log.error("失败,error:{},stackTrace:{}", e.getMessage(), e.getStackTrace());
        }
    }
}

那kafka的相关配置在哪里呢 比如链接kafka的服务地址啊,组id啊,我们从上面图片中看到了个 config文件夹,在里面创建BatchConsumerConfig类,具体内容如下

package com.adasplus.gps_handler.config;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.Map;


@Configuration
@EnableKafka
public class BatchConsumerConfig {

    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupID;

    @Value("${spring.kafka.consumer.max-poll-records}")
    private int maxPoll;

    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private String autoCommit;

    @Value("${spring.kafka.consumer.auto-commit-interval}")
    private int autoCommitInterval;

    /**
     * 多线程-批量消费
     *
     * @return
     */
    @Bean
    public KafkaListenerContainerFactory<?> batchFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        // 控制多线程消费
        factory.setConcurrency(1);
        // poll超时时间 5 秒
        factory.getContainerProperties().setPollTimeout(5000);
        // 控制批量消费
        // 设置为批量消费,每个批次数量在Kafka配置参数中设置(max.poll.records)
        factory.setBatchListener(true);
        return factory;
    }

    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    /**
     * 消费者配置
     *
     * @return
     */
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> configProps = new HashMap<>();
        // 不用指定全部的broker,它将自动发现集群中的其余的borker, 最好指定多个,万一有服务器故障
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        //topic
        // key序列化方式
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // value序列化方式
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // GroupID
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
        // 批量消费消息数量
        configProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPoll);

        // -----------------------------额外配置,可选--------------------------
        // 自动提交偏移量
        // 如果设置成true,偏移量由auto.commit.interval.ms控制自动提交的频率
        // 如果设置成false,不需要定时的提交offset,可以自己控制offset,当消息认为已消费过了,这个时候再去提交它们的偏移量。
        // 这个很有用的,当消费的消息结合了一些处理逻辑,这个消息就不应该认为是已经消费的,直到它完成了整个处理。
        configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
        // 自动提交的频率
        configProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        // Session超时设置
        configProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 5 * 60 * 1000);
        // 心跳时间 30s
        configProps.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 30 * 1000);
        configProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 10 * 60 * 1000);

        // 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
        // latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
        // earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
        configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        return configProps;
    }
}


介绍完配置,我们再看看GpsHandlerCallable 具体业务类的实现

实现 Runnable接口,创建一个线程的方式有很多,比如继承Thread类,这里我们选择实现Runnable,然后线程池那边直接submit 来将任务提交到线程池

@Slf4j
@Service
public class GpsHandlerCallable implements Runnable {

    private  Map<String, ArrayList<JSONObject>> gpsMap;

    private  String redisSetKey;


    public GpsHandlerCallable(Map<String, ArrayList<JSONObject>> gpsMap) {
        this.gpsMap = gpsMap;

    }

    @Autowired
    private MongoTemplate mongoTemplate;

    private final UpdateOptions updateOptions = new UpdateOptions().upsert(true);


    @Override
    public void run() {
        if (this.gpsMap.size() > 0) {
            handData();
        }
    }

    public void handData() {
        insertGps();
    }

    /**
     * 清除报警数据(施工巡检)
     */
    private void insertGps(){
        mongoTemplate.insert(this.gpsMap,"goosTable");
    }
}

好了,整体流程就结束了文章来源地址https://www.toymoban.com/news/detail-599149.html

到了这里,关于spring 中kafka的基本使用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spring-Kafka生产者源码分析

    本文主要概括Spring Kafka生产者发送消息的主流程 代码准备: SpringBoot项目中maven填加以下依赖 消息发送使用 KafkaTemplate 启动类 KafkaAutoConfiguration 有两个地方需要关注 其中的 ProducerFactory 使用的是 DefaultKafkaProducerFactory 在发送消息之前,Spring Kafka会先创建 Producer ,返回的是 Clos

    2024年02月09日
    浏览(40)
  • spring-kafka中ContainerProperties.AckMode详解

      近期,我们线上遇到了一个性能问题,几乎快引起线上故障,后来仅仅是修改了一行代码,性能就提升了几十倍。一行代码几十倍,数据听起来很夸张,不过这是真实的数据,线上错误的配置的确有可能导致性能有数量级上的差异,等我说完我们这个性能问题你就清楚了

    2024年02月08日
    浏览(48)
  • Spring-Kafka 发送消息的两种写法

    本文主要是使用 Java 语言中 spring-kafka 依赖 对 Kafka 进行使用。 使用以下依赖对 Kafka 进行操作: 需要更改版本的话,可以前往:Maven 仓库 创建项目,先创建一个简单的 Maven 项目,删除无用的包、类之后,使用其作为一个父级项目。 以下内容如果在项目启动时报这个错: 把

    2024年01月20日
    浏览(48)
  • 浅析Spring-kafka源码——消费者模型的实现

    SpringBoot项目中的消费端实现而言,Spring-kafka没有用原生的ConsumerConnector,,而是借助原生client的拉取消息功能做了自己的消费模型的实现,提供了@KafkaListener注解这种方式实现消费。 开发中在使用Spring-kafka时,一般也都是通过使用@KafkaListener注解的方法来实现消息监听和消费。

    2024年02月09日
    浏览(56)
  • Spring-Kafka如何实现批量消费消息并且不丢失数据

    先给答案: 某个业务对象由多张表关联而成,要创建该对象需要向多张表插入数据,基于canal的监控就会有多次该对象的变更记录,而Kafka消费的时候也会多次处理同一个对象(虽然不同表,但是同一个对象的不同部分),原有的Kafka消费者是一次处理一条,这将造成重复对同

    2024年02月13日
    浏览(48)
  • Spring boot使用Kafka Java反序列化漏洞 CVE-2023-34040

    背景:公司项目扫描到 Spring-Kafka上使用通配符模式匹配进行的安全绕过漏洞 CVE-2023-20873 中等风险 | 2023年8月23日 | CVE-2023-34040 在Spring for Apache Kafka 3.0.9及更早版本以及2.9.10及更早版本中,存在可能的反序列化攻击向量,但只有在应用了不常见的配置时才会出现。攻击者必须在

    2024年02月07日
    浏览(51)
  • 使用Spring Boot和Kafka实现消息订阅和发送

    最近忙着搞低代码开发,好久没新建spring项目了,结果今天心血来潮准备建个springboot项目 注意Type选Maven,java选8,其他默认 点下一步后完成就新建了一个spring boot项目,配置下Maven环境,主要是settings.xml文件,里面要包含阿里云仓库,不然可能依赖下载不下来 在maven配置没问

    2024年02月11日
    浏览(41)
  • 使用Spring Boot和Kafka实现消息发送和订阅

    最近忙着搞低代码开发,好久没新建spring项目了,结果今天心血来潮准备建个springboot项目 注意Type选Maven,java选8,其他默认 点下一步后完成就新建了一个spring boot项目,配置下Maven环境,主要是settings.xml文件,里面要包含阿里云仓库,不然可能依赖下载不下来 在maven配置没问

    2024年02月11日
    浏览(42)
  • 使用 Spring Boot 整合 Kafka:实现高效的消息传递

    Kafka 是一种流处理平台,用于在分布式系统中处理高吞吐量的数据流。它是一种基于发布订阅模式的消息系统,能够处理来自多个应用程序的数据流。Kafka 具有高度的可扩展性、可靠性和性能,使得它成为处理大数据的流行选择。 Spring Boot 是一种开源框架,用于简化 Java 应用

    2024年02月14日
    浏览(48)
  • spring 中kafka的基本使用

    默认大家都是maven工程 第一步 在pom文件中 引入 在项目中创建多线程消费者的类,因为频繁创建和销毁线程也会有性能消耗,所以先创建线程池   那kafka的相关配置在哪里呢 比如链接kafka的服务地址啊,组id啊,我们从上面图片中看到了个 config文件夹,在里面创建 BatchConsum

    2024年02月16日
    浏览(27)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包