kafka--技术文档--spring-boot集成基础简单使用

这篇具有很好参考价值的文章主要介绍了kafka--技术文档--spring-boot集成基础简单使用。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

阿丹:

        查阅了很多资料了解到,使用了spring-boot中整合的kafka的使用是被封装好的。也就是说这些使用其实和在linux中的使用kafka代码的使用其实没有太大关系。但是逻辑是一样的。这点要注意!

使用spring-boot整合kafka

1、导入依赖

核心配置为:

  <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

如果在下面规定了spring-boot的版本那么就不需要再使用版本号,如果没有的话就需要规定版本号。 

      <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>3.1.2</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.75</version>
        </dependency>
        <!--配置文件报错问题-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <!--lombok-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.22</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

2、写入配置

#服务端口号
server:
  port: 8025

spring:
  main:
    allow-circular-references: true
  application:
    name: producer
  kafka:
    bootstrap-servers: kafka的ip地址:9092
    producer:
      # 发生错误后,消息重发的次数。
      retries: 1
      #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
      batch-size: 16384
      # 设置生产者内存缓冲区的大小。
      buffer-memory: 33554432
      # 键的序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 值的序列化方式
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
      # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
      # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
      acks: 1
    consumer:
      # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
      # latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
      # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
      auto-offset-reset: earliest
      # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
      enable-auto-commit: false
      # 键的反序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 值的反序列化方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      # 在侦听器容器中运行的线程数。
      concurrency: 5
      #listner负责ack,每调用一次,就立即commit
      ack-mode: manual_immediate
      missing-topics-fatal: false

3、生产者

将发送封装为一个工具类

    public void send(Object obj){
        String obj2String = JSON.toJSONString(obj);
        log.info("准备发送消息为:{}",obj2String);

        //发送消息
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(TOPIC_TEST, obj2String);
        //回调
        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable ex) {
                //发送失败的处理
                log.info(TOPIC_TEST + " - 生产者 发送消息失败:" + ex.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, Object> result) {
                //成功的处理
                log.info(TOPIC_TEST + " - 生产者 发送消息成功:" + result.toString());
            }
        });

4、消费者

kafka--技术文档--spring-boot集成基础简单使用,各种MQ信息消费中间件,kafka,linq,数据库

 如果需要使用多线程来监听的话使用这个策略。

@KafkaListener(topics = "Hello-Kafka", groupId = "group1")
public void onMessage1(ConsumerRecord<?, ?> record) {
    // 消息处理逻辑
}

@KafkaListener(topics = "Hello-Kafka", groupId = "group2")
public void onMessage2(ConsumerRecord<?, ?> record) {
    // 消息处理逻辑
}

以上就可以简单实现一个kafka的监听消费。文章来源地址https://www.toymoban.com/news/detail-672526.html

到了这里,关于kafka--技术文档--spring-boot集成基础简单使用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • kafka--技术文档-基本概念-《快速了解kafka》

    学习一种新的消息中间键,卡夫卡!!! 官网网址 Apache Kafka         Kafka是一种开源的分布式流处理平台,由Apache软件基金会开发,用Scala和Java编写。它是一个高吞吐量的分布式发布订阅消息系统,可以处理消费者在网站中的所有动作流数据。这种动作可以是网页浏览、

    2024年02月11日
    浏览(39)
  • kafka--技术文档--基本docker中安装<单机>-linux

    阿丹小科普:         Kafka在0.11.0.0版本之后不再依赖Zookeeper,而是使用基于Raft协议的Kafka自身的仲裁机制来替代Zookeeper。具体来说,Kafka 2.8.0版本是第一个不需要Zookeeper就可以运行Kafka的版本,这被称为Kafka Raft Metadata mode(Kafka Raft元数据模式)。引入基于Raft协议的KRaft模

    2024年02月11日
    浏览(33)
  • spring-boot集成mybatis真的很简单吗?

    在日常的后端开发中,使用mybatis作为DAO层的持久框架已经是惯例。但很多时候都是在别人搭好的框架中进行开发,对怎么搭建环境是一知半解,今天就来实践下。 来看下集成mybatis需要哪些步骤, 1、确定环境及依赖 2、配置文件; 3、测试 这里, 基于springboot集成mybatis。 先

    2024年02月08日
    浏览(35)
  • spring-boot集成spring-brick实现动态插件

    spring-boot集成spring-brick实现动态插件 项目结构 需求实现 spring-boot集成spring-brick 环境说明 1. 主程序集成spring-brick 第一步:引入相关依赖 第二步:修改程序入口方法 第三步:编写配置 第四步:设置maven插件 2. 准备plugin-api 第一步:引入相关依赖 第二步:引入相关依赖 3. 实现

    2024年02月14日
    浏览(33)
  • websocket--技术文档--spring后台+vue基本使用

            给大家分享一个可以用来进行测试websocket的网页,个人觉得还是挺好用的. WebSocket在线测试工具 还有一个小家伙 ApiPost也可以进行使用websocket的测试。 在Spring Boot中使用WebSocket建立服务端,可以按照以下步骤进行: 确保的Spring Boot项目已经创建并配置好。 在项目的

    2024年02月09日
    浏览(33)
  • MongoDB文档-进阶使用-spring-boot整合使用MongoDB---MongoRepository完成增删改查

    阿丹:         之前学习了在MongoDB客户端上的MongoDB语句现在将MongoDB整合到spring项目。 MongoDB文档--基本概念_一单成的博客-CSDN博客 MongoDB文档--基本安装-linux安装(mongodb环境搭建)-docker安装(挂载数据卷)-以及详细版本对比_一单成的博客-CSDN博客 MongoDB文档--基本安装-linu

    2024年02月14日
    浏览(36)
  • 首选Spring MVC实战架构文档:GitHub上率先发布,引领技术革新

    Spring MVC是一个基于Java的Web框架,它遵循MVC设计模式,实现了请求驱动类型的轻量级架构。通过将Model、View和Controller分离,Spring MVC将Web层的职责进行了清晰的划分,使得复杂的Web应用程序变得结构清晰、易于开发和维护。 其中,DispatcherServlet是Spring MVC框架的核心组件。作为

    2024年02月02日
    浏览(42)
  • Spring Boot集成JasperReport生成文档

    由于工作需要,要实现后端根据模板动态填充数据生成PDF文档,通过技术选型,使用Ireport5.6来设计模板,结合JasperReports5.6工具库来调用渲染生成PDF文档。 一、使用Ireport designer 5.6设计模板 ireport的使用由于时间关系不便多说,设计好之后,将其进行编译生成jasper文件,然后将

    2024年02月09日
    浏览(31)
  • Spring Boot集成Kafka详解

    Spring Boot是一个用于构建独立的、生产级的Java应用程序的框架,而Kafka是一种高吞吐量的分布式发布订阅消息系统。在本文中,我们将详细解释如何在Spring Boot项目中集成Kafka。 1. 添加依赖 首先,我们需要在项目的pom.xml文件中添加Spring Boot和Kafka的依赖。 2. 配置Kafka 接下来,

    2024年02月09日
    浏览(30)
  • Spring Boot集成kafka的相关配置

    额外依赖只需要这一个,kafka-client 不是springboot 的东西,那是原生的 kafka 客户端, kafka-test也不需要,是用代码控制broker的东西。 也可以用java类Config 方式配置,如果没有特殊要求,可以只用spring配置的方式 注意加上@Component,被spring管理监听才有效 注意这里不能用@Value注解

    2024年02月07日
    浏览(36)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包