【Spring Boot】集成Kafka实现消息发送和订阅

这篇具有很好参考价值的文章主要介绍了【Spring Boot】集成Kafka实现消息发送和订阅。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一,新建Spring Boot

最近忙着搞低代码开发,好久没新建spring项目了,结果今天心血来潮准备建个springboot项目
【Spring Boot】集成Kafka实现消息发送和订阅,后端,spring boot,kafka,后端,订阅,消息队列
注意Type选Maven,java选8,其他默认
【Spring Boot】集成Kafka实现消息发送和订阅,后端,spring boot,kafka,后端,订阅,消息队列

1,Maven配置

点下一步后完成就新建了一个spring boot项目,配置下Maven环境,主要是settings.xml文件,里面要包含阿里云仓库,不然可能依赖下载不下来
【Spring Boot】集成Kafka实现消息发送和订阅,后端,spring boot,kafka,后端,订阅,消息队列
【Spring Boot】集成Kafka实现消息发送和订阅,后端,spring boot,kafka,后端,订阅,消息队列

2,无法识别为SpringBoot项目

在maven配置没问题的前提下,IDEA无法识别这是一个Spring Boot项目,倒腾半天,终于发现问题原因所在=======>是Maven版本太高的原因
【Spring Boot】集成Kafka实现消息发送和订阅,后端,spring boot,kafka,后端,订阅,消息队列
把.mvn/wrapper目录下的maven-wrapper.properties文件第一行的版本号降低,比如说降为3.5.4,然后重新点下Maven的同步按钮
【Spring Boot】集成Kafka实现消息发送和订阅,后端,spring boot,kafka,后端,订阅,消息队列

3,无效的源发行版

接下来运行项目报错:java: 无效的源发行版: 14
【Spring Boot】集成Kafka实现消息发送和订阅,后端,spring boot,kafka,后端,订阅,消息队列
修改pom.xml中java.version值为8,原来是17

	<properties>
        <java.version>17</java.version>
    </properties>

4,无法访问SpringApplication

继续运行,继续报错【Spring Boot】集成Kafka实现消息发送和订阅,后端,spring boot,kafka,后端,订阅,消息队列
降低spring-boot-starter-parent版本,原来是3.1.3,改为2.7.2

5,运行直接Finish

继续运行,没报错,服务直接Finished
【Spring Boot】集成Kafka实现消息发送和订阅,后端,spring boot,kafka,后端,订阅,消息队列
需要添加web依赖

 		<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

6,服务运行成功

终于,一个空的spring boot项目成功跑起来了,喜极而泣
【Spring Boot】集成Kafka实现消息发送和订阅,后端,spring boot,kafka,后端,订阅,消息队列

二,安装启动Kafka

1,下载

官网=>https://kafka.apache.org/downloads,下载最新版的kafka,目前是3.5.1
【Spring Boot】集成Kafka实现消息发送和订阅,后端,spring boot,kafka,后端,订阅,消息队列

2,配置

解压到D盘Config目录下即完成安装,目录为D:\Config\kafka_2.13-3.5.1
修改配置文件
(1) server.properties

broker.id=1
log.dirs=/Config/kafka_2.13-3.5.1/logs-kafka

(2) zookeeper.properties

dataDir=/Config/kafka_2.13-3.5.1/logs-zookeeper

3,启动

先启动zookeeper

bin\windows\zookeeper-server-start.bat config\zookeeper.properties	

再启动kafka

bin\windows\kafka-server-start.bat config\server.properties

停止的时候,先停止kafka,再停止zookeeper,直接ctrl+c停止

4,其他命令

1,查看topic列表

bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092

2,查看topic具体信息

bin\windows\kafka-topics.bat --describe --bootstrap-server localhost:9092 --topic test

3,创建topic

bin\windows\kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

三,生产消费消息

1,加入依赖

 		<dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

2,yam配置文件

application.yaml

spring:
  profiles:
    active: dev

application-dev.yaml

server:
  port: 8082
  servlet:
    context-path: /test-kafka

spring:
  cache:
    type: ehcache
    config: classpath:ehcache.xml
  jpa:
    database-platform: com.enigmabridge.hibernate.dialect.SQLiteDialect
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    consumer:
      group-id: kafka-demo-kafka-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer 
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer 
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer 
      value-serializer: org.apache.kafka.common.serialization.StringSerializer 
      retries: 10

3,报错enabled mechanisms are []

Connection to node -1 (activate.navicat.com/127.0.0.1:9092) failed authentication due to: Unexpected handshake request with client mechanism PLAIN, enabled mechanisms are []

【Spring Boot】集成Kafka实现消息发送和订阅,后端,spring boot,kafka,后端,订阅,消息队列
这个错误我本地测试下来是因为没把账号密码配置这块注释掉
【Spring Boot】集成Kafka实现消息发送和订阅,后端,spring boot,kafka,后端,订阅,消息队列

4,生产者生产消息

@Slf4j
@Component
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public String sendMessage(String content) {
        String topic = "test_topic";
        kafkaTemplate.send(topic, content).addCallback(success -> {
            String topic = success.getRecordMetadata().topic();
            int partition = success.getRecordMetadata().partition();
            long offset = success.getRecordMetadata().offset();
            log.info("发送成功:主题:{},分区:{},偏移量:{}",topic,partition,offset);
        }, failure -> {
            log.info("发送失败:{}",failure.getMessage());
        });
        return "发送成功";
    }
}

5,订阅和消费消息

一,订阅主题
1,获取消费者

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

import java.util.Properties;

/**
 * kafka消费者配置
 * @author liuxunming
 */
@Configuration
@Component
public class KafkaConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Value("${spring.kafka.consumer.key-deserializer}")
    private String keyDeserializer;

    @Value("${spring.kafka.consumer.value-deserializer}")
    private String valueDeserializer;

    public KafkaConsumer<String, String> createConsumer() {

        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        return consumer;
    }

}

2,订阅topic

 		KafkaConsumer<String, String> consumer = kafkaConfig.createConsumer();
        consumer.subscribe(Collections.singleton("traffic"));

3,拉取消息

 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
 for (ConsumerRecord<String, String> record : records) {
		String key = record.key();
		String value = record.value();
		log.info("\n收到消息key=>{}\n收到消息value=>{}",key,value);
}

4,消费位移,释放资源

// 提交消费位移
consumer.commitSync();
// 关闭消费者以释放资源
consumer.close();

二,点对点模式

@Slf4j
@Component
public class KafkaConsumer {
    @KafkaListener(topics = {"test_topic"})
    public void handlerMsg(String content) {
        log.info("接收到消息:消息值:{} ",content);
    }
}

6,接口

@Slf4j
@RestController
public class KafkaController {

    @Autowired
    private KafkaProducer kafkaProducer;

    @PostMapping("/sendMessage")
    public String sendMessage(@RequestParam String content) {
        kafkaProducer.sendMessage(content);
        return "ok";
    }
}

7,测试结果

【Spring Boot】集成Kafka实现消息发送和订阅,后端,spring boot,kafka,后端,订阅,消息队列
接收到消息
【Spring Boot】集成Kafka实现消息发送和订阅,后端,spring boot,kafka,后端,订阅,消息队列文章来源地址https://www.toymoban.com/news/detail-708058.html

四,参考博文

  1. 解决IDEA无法识别SpringBoot项目
  2. SpringBoot从入门到精通(十二)SpringBoot集成Kafka
  3. Kafka的下载安装以及使用
  4. Kafka消息消费流程详解
  5. Kafka之Consumer使用与基本原理

到了这里,关于【Spring Boot】集成Kafka实现消息发送和订阅的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 实战指南:使用Spring Boot实现消息的发送和接收

    当涉及到消息发送和接收的场景时,可以使用Spring Boot和消息中间件RabbitMQ来实现。下面是一个简单的示例代码,展示了如何在Spring Boot应用程序中创建消息发送者和接收者,并发送和接收一条消息。 首先,你需要进行以下准备工作 确保你已经安装了Java和Maven,并设置好相应

    2024年02月11日
    浏览(39)
  • Spring Boot集成WebSocket实现消息推送

    项目中经常会用到消息推送功能,关于推送技术的实现,我们通常会联想到轮询、comet长连接技术,虽然这些技术能够实现,但是需要反复连接,对于服务资源消耗过大,随着技术的发展,HtML5定义了WebSocket协议,能更好的节省服务器资源和带宽,并且能够更实时地进行通讯。

    2023年04月08日
    浏览(34)
  • 使用 Spring Boot 整合 Kafka:实现高效的消息传递

    Kafka 是一种流处理平台,用于在分布式系统中处理高吞吐量的数据流。它是一种基于发布订阅模式的消息系统,能够处理来自多个应用程序的数据流。Kafka 具有高度的可扩展性、可靠性和性能,使得它成为处理大数据的流行选择。 Spring Boot 是一种开源框架,用于简化 Java 应用

    2024年02月14日
    浏览(36)
  • Spring Boot 集成 WebSocket 实现服务端推送消息到客户端

          假设有这样一个场景:服务端的资源经常在更新,客户端需要尽量及时地了解到这些更新发生后展示给用户,如果是 HTTP 1.1,通常会开启 ajax 请求询问服务端是否有更新,通过定时器反复轮询服务端响应的资源是否有更新。                         在长时间不更新

    2024年02月16日
    浏览(39)
  • 【Spring Boot 3】【Redis】消息发布及订阅

    软件开发是一门实践性科学,对大多数人来说,学习一种新技术不是一开始就去深究其原理,而是先从做出一个可工作的DEMO入手。但在我个人学习和工作经历中,每次学习新技术总是要花费或多或少的时间、检索不止一篇资料才能得出一个可工作的DEMO,这占用了我大量的时

    2024年01月21日
    浏览(37)
  • Spring RabbitMQ那些事(1-交换机配置&消息发送订阅实操)

    在上一节 RabbitMQ中的核心概念和交换机类型 中我们介绍了RabbitMQ中的一些核心概念,尤其是各种交换机的类型,接下来我们将具体讲解各种交换机的配置和消息订阅实操。 我们先上应用启动配置文件 application.yml ,如下: 备注:这里我们指定了 RabbitListenerContainerFactory 的类型

    2024年01月17日
    浏览(30)
  • Spring Boot进阶(48):【实战教程】SpringBoot集成WebSocket轻松实现实时消息推送

            WebSocket是一种新型的通信协议,它可以在客户端与服务器端之间实现双向通信,具有低延迟、高效性等特点,适用于实时通信场景。在SpringBoot应用中,集成WebSocket可以方便地实现实时通信功能,如即时聊天、实时数据传输等。         本文将介绍如何在Sprin

    2024年02月09日
    浏览(38)
  • 小程序实现消息订阅及发送

    在我们的家政服务小程序中,用户可以新增预约。一般的场景是新增预约的时候提醒用户接收通知,在状态变更的时候我们来发送订阅消息。本篇我们来讲解一下小程序订阅消息功能的开发。 要想发送订阅消息,首先需要选用一个消息模板。打开你自己的小程序后台,点击订

    2024年02月07日
    浏览(27)
  • springboot集成kafka详细步骤(发送及监听消息示例)

    1、本机的kafka环境配置,不再赘述 2、添加 pom 文件 3、配置application.yml 4、复写kafka的相关配置类:生产、消费相关配置 5、生产、消费的伪代码 6、测试消息发送 经过测试!

    2024年02月11日
    浏览(26)
  • Spring Boot进阶(62):Redis魔法:用发布订阅功能打造高效消息队列!

            话说,玩过MQ的同学可能都知道【发布订阅】模式,不就是一种消息传递方式嘛;如果没玩过,那也不打紧,下文我会简单做个科普。但是对于Redis如何实现MQ的【发布订阅】功能?这才是问题的关键,有的同学就说“压根没玩过呀!不造” ,哈哈,bug菌既然敢写便有

    2024年02月09日
    浏览(35)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包