在Springboot中接收kafka消息

这篇具有很好参考价值的文章主要介绍了在Springboot中接收kafka消息。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

整体描述

之前写过一篇使用docker搭建kafka服务的文章,使用centos搭建kafka服务器Docker,本文主要简单将一下在springboot框架下,接收kafka服务器发过来的消息。

版本对应

由于使用springboot,管理版本时和springboot绑定的,我目前用的是springboot2.7,kafka的版本是2.1,这个版本也没啥影响,因为kafka服务器是向下兼容的,也就是说你的kafka服务器的版本是3.1,kafka客户端的版本使用3.1以下的,就都可以。

具体接入

1. pom引用

直接使用springboot框架带的kafka客户端,不指定版本号,引入的默认版本就是和springboot版本有关的。这块我看网上有指定版本号会报错的,因为springboot和kafka的版本是有对应关系的,如果引入的kafka版本和当前使用的springboot版本不兼容,就会报错。具体版本对应关系可以自己去网上搜一下。

        <!-- kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

2. kafka参数配置

配置kafka参数,这块有两种方式,一是直接在配置文件里写,还有就是在代码里写,两种方式都可以,我这里就直接在springboot的config里写了,添加KafkaConsumerConfig.java:

/**
 * Kafka消费者配置类
 *
 * @author thcb
 * @date 2023-05-24
 */
@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    public final static String BOOTSTRAP_SERVERS = "192.168.1.100:9092";
    public final static String GROUP_ID = "test_group";

    @Bean
    @Conditional(KafkaCondition.class)
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String>
                factory = new ConcurrentKafkaListenerContainerFactory<>();
        // 设置消费者工厂
        factory.setConsumerFactory(consumerFactory());
        // 消费者组中线程数量
        factory.setConcurrency(3);
        // 拉取超时时间
        factory.getContainerProperties().setPollTimeout(3000);

        // 当使用批量监听器时需要设置为true
        factory.setBatchListener(true);

        return factory;
    }

    @Bean
    @Conditional(KafkaCondition.class)
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    @Conditional(KafkaCondition.class)
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        // Kafka地址
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        //配置默认分组,这里没有配置+在监听的地方没有设置groupId,多个服务会出现收到相同消息情况
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        // 是否自动提交offset偏移量(默认true)
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        // 自动提交的频率(ms)
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        // Session超时设置
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        //请求超时时间
        propsMap.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "15000");
        // 键的反序列化方式
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // 值的反序列化方式
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // offset偏移量规则设置:
        // (1)、earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
        // (2)、latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
        // (3)、none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        return propsMap;
    }
}

注意BOOTSTRAP_SERVERS 需要根据自己的kafka服务器地址配置。这块由于使用的是config配置,所以在kafka服务无法访问时,springboot程序就会启动失败,这块根据实际情况处理吧,由于我现在的项目,kafka是第三方接口,用来接收第三方数据的,所以kafka服务器无法访问,不应该影响程序启动。所以这块我加了一个判断,如果kadka服务无法访问,就不进行kafka相关的初始化操作,也是使用springboot带的注解实现的。

3. 添加Conditional注解

这个注解就是提供一个判断,如果判断通过,就执行注解的内容,如果不通过,就不执行。
这块我们可以在注解里加一个判断kafka服务器的操作:

/**
 * kafka动态启动
 * kafka代理服务器正常时启动kafka服务
 * kafka代理服务器不可用时,不启动kafka服务
 *
 * @author thcb
 * @date 2023-05-24
 */
public class KafkaCondition implements Condition {
    public static final Logger log = LoggerFactory.getLogger(KafkaCondition.class);

    @Override
    public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
        URI uri = URI.create("http://" + KafkaConsumerConfig.BOOTSTRAP_SERVERS);
        String host = uri.getHost();
        int port1 = uri.getPort();
        boolean b = this.isHostConnectable(host, port1);
        log.info("matches:{}", b);
        return b;
    }

    /**
     * 判断kafka服务器,能否正常连接
     *
     * @param host
     * @param port
     * @return
     */
    public boolean isHostConnectable(String host, int port) {
        log.info("isHostConnectable:host:{},port:{}", host, port);
        Socket socket = new Socket();
        try {
            //判断kafka网络是否能联通,不能连通则返回false
            socket.connect(new InetSocketAddress(host, port), 2000);
        } catch (IOException e) {
            log.error("isHostConnectable:{}", ExceptionUtil.getExceptionMessage(e));
            return false;
        } finally {
            try {
                socket.close();
            } catch (IOException e) {
                log.error("isHostConnectable:{}", ExceptionUtil.getExceptionMessage(e));
            }
        }
        return true;
    }
}

然后就在使用kafka的地方都加上这个注解,其实在KafkaConsumerConfig的配置类里,就已经加上了。

4. 添加listener

创建一个监听,直接监听kafka消息就可以了。其中主题根据kafka服务端发的主题确定,GROUP_ID 就是配置文件里设置的。

/**
 * Kafka消费者Listener
 *
 * @author thcb
 * @date 2023-05-24
 */
@Component
@Conditional(KafkaCondition.class)
public class EimpKafkaConsumerListener {

    public static final Logger log = LoggerFactory.getLogger(EimpKafkaConsumerListener.class);
    public final static String TOPIC = "test.topic";
    public final static String GROUP_ID = "test_group";

    //监听kafka消费
    @KafkaListener(topics = TOPIC, groupId = GROUP_ID, containerFactory = "kafkaListenerContainerFactory")
    @Conditional(KafkaCondition.class)
    public void onMessage(String message) {
        log.info("EimpKafkaConsumerListener onMessage:{}", message);
    }
}

这样就可以在程序里接收kafka的消息了,在前面我写的那个文章里,有开启服务端Demo的方法,可以在服务端模拟发消息,在程序的log里就能收到了。

总结

服务器搭建起来之后,接收kafka消息就简单多了。本文主要将接收kafka消息的方式整理了一下,还加了对kafka服务器是否可用的判断。文章来源地址https://www.toymoban.com/news/detail-743755.html

到了这里,关于在Springboot中接收kafka消息的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Filebeat部署+Kafka接收消息

    1.Filebeat官方下载地址 :https://www.elastic.co/cn/downloads/past-releases#filebeat 我下的是7.12版本,链接: link 2.上传压缩包 解压并重命名文件夹 tar -zxvf filebeat-7.12.0-linux-x86_64.tar.gz mv filebeat-7.12.0-linux-x86_64 filebeat home是用户目录,个人习惯放在这个,你们也可以放在别的目录里。 修改fileb

    2024年02月16日
    浏览(19)
  • 在前后端分离的项目中,Springboot vue,前端把json传到后端,后端用一个类接收,json中的数据是怎么转换类型的

    在前后端分离的项目中,前端通常会将数据以 JSON 格式传输给后端,后端需要将接收到的 JSON 数据转换为对应的类型。这个过程可以通过后端框架和库来自动完成。 在Spring Boot中,后端可以使用相关的库来实现JSON数据的转换。常见的库包括Jackson、Gson和FastJson等。这些库提供

    2024年02月13日
    浏览(67)
  • 页面数据类型为json,后端接受json数据

    取得input 的输入值然后编写json数据,JSON.stringify(student) 将student 转化为json对象

    2024年01月19日
    浏览(41)
  • Springboot开发时,对前端的请求参数,后端用于接受的实体类有没有必要校验为null?

    分析过程:         首先==null对于引用类型是判断这个对象有没有被加载到内存当中。对象的产生是由声明、是实列化、初始化三个过程.         初始化: RequestzbszAdd requestzbszAdd; 也就是声明一个变量         实列化:使用new         初始化:new  RequestzbszAdd()

    2024年02月09日
    浏览(42)
  • java后端该怎样来接受前端日期选择器传入的时间参数

    如果前端使用了日期选择器并且将选择的日期传给了Java后端,那么Java后端可以使用如下方法来接收日期参数: 在后端的方法中声明一个形参,类型为 java.util.Date 或 java.time.LocalDate ,然后在前端的请求中传入的日期参数会被自动封装成相应的日期对象。例如: 在后端的方法

    2024年02月13日
    浏览(43)
  • Java 数据库改了一个字段, 前端传值后端接收为null问题解决

    前端传值后端为null的原因可能有很多种,我遇到一个问题是,数据库修改了一个字段,前端传值了,但是后台一直接收为null值, 原因排查: 1、字段没有匹配上,数据库字段和前端字段传值不一致 2、大小写一定要注意 这个疏忽大意了 以上都改了还是null ~~~~! 3、get set方法

    2024年02月10日
    浏览(85)
  • java:Kafka生产者推送数据与消费者接收数据(参数配置以及案例)

    bootstrap.servers :Kafka集群中的Broker列表,格式为host1:port1,host2:port2,…。生产者会从这些Broker中选择一个可用的Broker作为消息发送的目标Broker。 acks :Broker对消息的确认模式。可选值为0、1、all。0表示生产者不会等待Broker的任何确认消息;1表示生产者会等待Broker的Leader副本确认

    2024年02月16日
    浏览(44)
  • spring cloud steam 整合kafka 进行消息发送与接收

    spring cloud steam : Binder和Binding Binder是SpringCloud Stream的一个抽象概念,是应用与消息中间件之间的粘合剂,目前SpringCloud Stream实现了Kafka和RabbitMQ的binder Binder可以生成Binding,Binding用来绑定消息容器的生产者和消费者,它有两种类型,INPUT和OUTPUT,INPUT对应于消费者,OUTPUT对应于

    2024年02月10日
    浏览(41)
  • 【Spring连载】使用Spring访问 Apache Kafka(三)----接收消息

    当你使用消息监听器容器,你必须提供一个监听器接收数据。现在有8个受支持的接口做为消息监听器。见以下代码: MessageListener:,当使用自动提交或者容器管理的提交方法之一时,使用MessageListener接口处理从kafka consumer的poll方法收到的单个ConsumerRecord实例。 AcknowledgingMes

    2024年02月01日
    浏览(39)
  • 【快速开始】一个简单的Flask-SocketIO应用,完成后端推送消息接收与关闭

    本人使用环境及版本: Anaconda: 虚拟环境: Python版本:3.8.13 安装包及版本: Flask-SocketIO :5.3.4 eventlet :0.33.3 创建app.py文件(文件名随意,不过要与后面的运行脚本中指定的文件保持一致) cmd 或者 linux控制台运行即可 此时能看到如下图所示 此时访问http://0.0.0.0:5200(0.0.0

    2024年02月13日
    浏览(52)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包