最简单的SpringCloudStream集成Kafka教程

这篇具有很好参考价值的文章主要介绍了最简单的SpringCloudStream集成Kafka教程。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

开发中,服务与服务之间通信通常会用到消息中间件,如果我们使用了某一个MQ,那么消息中间件与我们的系统算是高耦合。将来有一天,要替换成另外的MQ,我们的改动就会比较大。为了解决这个问题,我们可以使用Spring Cloud Stream 来整合我们的消息中间件,降低耦合度,使服务可以更多关注自己的业务逻辑等。

今天为大家带来一个人人可实操的SpringCloudStream集成Kafka的快速入门示例。

1 前言

SpringCloudStream是一个构建高扩展性的事件消息驱动的微服务框架。简单点说就是帮助你操作MQ,可以与底层MQ框架解耦。将来想要替换MQ框架的时候会比较容易。最简单的SpringCloudStream集成Kafka教程

Kafka是一个分布式发布 - 订阅消息系统,源于LinkedIn的一个项目,2011年成为开源Apache项目。

ZooKeeper 是 Apache 软件基金会的一个软件项目,它为大型分布式计算提供开源的分布式配置服务、同步服务和命名注册,Kafka的实现同时也依赖于zookeeper。

2 Windows搭建简单的Kafka

2.1 启动zookeeper

使用Kafka首先需要启动zookeeper,windows中搭建zookeeper也很简单。以下几步即可完成:

  1. 下载zookeeper (本文使用3.7.0版本,下载链接在文章末尾。)
  2. 配置基本环境变量:
    1. 将conf文件夹下面的 zoo_sample.cfg 重命名zoo.cfg。并修改其工作目录dataDir。
    2. bin文件夹下面有zkEnv.cmd有zookeeper相关的配置,其中就包括JAVA_HOME,所以系统环境变量需要配置JAVA_HOME,或者直接用Java的路径来替换。
  3. 启动,在bin目录下运行zkServer.cmd脚本启动zookeeper。

默认启动端口2181为。

正常启动如下:最简单的SpringCloudStream集成Kafka教程

2.2 搭建Kafka

本地使用kafka同样也是如下的几个步骤:

  1. 下载Kafka(本文使用2.11版本,下载链接见文章末尾)
  2. 环境变量配置:
  3. 查看config文件下面的 server.properties配置文件中的zookeeper的配置zookeeper.connect=localhost:2181
  4. 在bin/windows文件夹下面kafka-run-class.bat文件中有JAVA_HOME的配置,同样也可以直接改成系统的Java路径.
  5. 在kafka根目录下使用如下命令启动kafka,并在zookeeper中注册。# .\bin\windows\kafka-server-start.bat .\config\server.properties
  6. 创建topic,在bin\windows目录下使用如下命令。创建名称为“test”的topickafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
  7. 使用windows命令窗口的producer和consumer,在bin\windows目录下使用如下命令#test topic的消息生产者
    kafka-console-producer.bat --broker-list localhost:9092 --topic test#test topic的消息消费者
    kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test#test topic的消息消费者(从头消费)
    kafka-console-consumer.bat --bootstrap-server localhost:9092 --from-beginning --topic

kafka启动windows界面如下最简单的SpringCloudStream集成Kafka教程

3 SpringCloudStream集成Kafka

3.1 引入依赖

由于我们直接使用Spring Cloud Stream 集成Kafka,官方也已经有现成的starter。

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-starter-stream-kafka</artifactId>
   <version>2.1.0.RELEASE</version>
</dependency>

3.2 关于kafka的配置

spring:
  application:
    name: shop-server
  cloud:
    stream:
      bindings:
        #配置自己定义的通道与哪个中间件交互
        input: #MessageChannel里Input和Output的值
          destination: test #目标主题 相当于kafka的topic
        output:
          destination: test1 #本例子创建了另外一个topic (test1)用于区分不同的功能区分。
      default-binder: kafka #默认的binder是kafka
  kafka:
    binder:
      zk-nodes: localhost:2181
    bootstrap-servers: localhost:9092 #kafka服务地址,集群部署的时候需要配置多个,
    consumer:
      group-id: consumer1 
    producer:
      key-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
      value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
      client-id: producer1
server:
  port: 8100

3.3 消费者示例

首先需要定义SubscribableChannel 接口方法使用Input注解。

public interface Sink {
    String INPUT = "input";

    @Input("input")
    SubscribableChannel input();
}

然后简单的使用 StreamListener 监听某一通道的消息。

@Service
@EnableBinding(Sink.class)
public class MessageSinkHandler {

    @StreamListener(Sink.INPUT)
    public void handler(Message<String> msg){
        System.out.println(" received message : "+msg);

    }
}

cloud stream配置中绑定了对应的Kafka topic,如下

cloud:
  stream:
    bindings:
      #配置自己定义的通道与哪个中间件交互
      input: #SubscribableChannel里Input值
        destination: test #目标主题

我们使用Kafka console producer 生产消息。

kafka-console-producer.bat --broker-list localhost:9092 --topic test

同时启动我们的示例SpringBoot项目,使用producer推送几条消息。最简单的SpringCloudStream集成Kafka教程

我们同时启动一个Kafka console consumer

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test

消费结果如下:最简单的SpringCloudStream集成Kafka教程

Spring Boot 项目消费消息如下:最简单的SpringCloudStream集成Kafka教程 

3.4 生产者示例

首先需要定义生产者MessageChannel,这里会用到Output注解

public interface KafkaSource {
    String OUTPUT = "output";

    @Output(KafkaSource.OUTPUT)
    MessageChannel output();
}

使用MessageChannel 发送消息。

@Component
public class MessageService {

    @Autowired
    private KafkaSource source;

    public Object sendMessage(Object msg) {
        source.output().send(MessageBuilder.withPayload(msg).build());
        return msg;
    }

定义一个Rest API 来触发消息发送

@RestController
public class MessageController {

    @Autowired
    private MessageService messageService;

    @GetMapping(value = "/sendMessage/{msg}")
    public String sendMessage(@PathVariable("msg") String msg){
        messageService.sendMessage("messageService send out : " + msg + LocalDateTime.now());
        return "sent message";
    }
}

配置中关于producer的配置如下

cloud:
  stream:
    bindings:
      input: 
        destination: test 
      output:
        destination: test1 #目标topic

启动SpringBoot App, 并触发如下API call

http://localhost:8100/sendMessage/JavaNorthProducer

我们同时启动一个Kafka console consumer,这里我们使用另一个test1 topic

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test1

console consumer消费消息如下:最简单的SpringCloudStream集成Kafka教程

总结

本章初步介绍了Spring Cloud Stream 集成Kafka的简单示例,实现了简单的发布-订阅功能。但是Spring Cloud Stream肯定还有更多的功能,我们后续还将继续深入学习更多Stream的功能。文章来源地址https://www.toymoban.com/news/detail-407036.html

到了这里,关于最简单的SpringCloudStream集成Kafka教程的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 史上最简单的Kafka安装教程

     解压apache-zookeeper-3.8.0-bin.tar.gz到指定目录,复制conf目录下zoo_sample.cfg到zoo.cfg,并修改配置。 进入bin目录,启动zookeeper 解压kafka_2.12-3.0.0.tgz到指定目录。 进入到config目录,修改server.properties配置 advertised.listeners才是真正的对外代理地址,listeners的作用不是对外提供服务代理,

    2024年02月07日
    浏览(51)
  • Python集成开发环境pycharm配置git详细教程

    Python集成开发环境pycharm配置git详细教程 Pycharm是一款很优秀的python集成开发环境,而git则是一个开源的分布式版本控制系统。接下来我们就pycharm中如何配置git,并对一些常用的操作进行详述。 一、 在pycharm中配置本地git。 依次打开File–Settings—Version Control,在Path to Git exec

    2024年02月02日
    浏览(49)
  • kafka:java集成 kafka(springboot集成、客户端集成)

    摘要 对于java的kafka集成,一般选用springboot集成kafka,但可能由于对接方kafka老旧、kafka不安全等问题导致kafak版本与spring版本不兼容,这个时候就得自己根据kafka客户端api集成了。 一、springboot集成kafka 具体官方文档地址:https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/

    2023年04月22日
    浏览(58)
  • Android开发教程:如何利用Service实现简单的音乐播放

    android音乐播放效果,简单的服务开启。 这里将用到android的四大组件之一:Service 注意:Service是自大组件之一,需要注册。 什么是服务? 1:“Service” 意思即“服务”的意思, 像 Windows 上面的服务一样,服务是在后台上运行,承担着静悄悄的不为人所注意的工作。 2:Serv

    2023年04月09日
    浏览(39)
  • 通讯编程006——NodeJS OPC UA Client开发简单教程

    本文介绍如何在NodeJS环境下开发OPC UA Client,通过本文可以对OPC UA的基本概念有所了解,掌握OPC UA的本质。相关软件请登录网信智汇(wangxinzhihui.com)。 开发步骤如下: 1)首先需要安装nodejs,要求版本至少是12。 2)创建项目目录,在cmd下进入项目目录下,执行如下指令: 1)n

    2024年02月09日
    浏览(38)
  • python简单易懂的小程序,python小程序开发教程

    本篇文章给大家谈谈python简单易懂的小程序,以及python小程序开发教程,希望对各位有所帮助,不要忘了收藏本站喔。 有不少同学学完 Python 后仍然很难将其灵活运用。我整理 37 个 Python 入门的小程序70个python练手项目。在实践中应用 Python 会有事半功倍的效果。 例子1:华氏

    2024年01月22日
    浏览(48)
  • SpringCloudStream整合RabbitMQ用ttl+死信实现延迟队列的实践

    这篇是关于我使用Spring Cloud Steam操作RabbitMQ采用ttl+死信队列的方式实现的延迟队列。 在公司项目中遇到了需要延迟队列的需求,为了以后可维护性和扩展性要求必须要用Springcloud Stream组件来操作mq,而且公司的rabbit也不允许安装延迟插件,只能用最原始的ttl+死信来实现,在搭

    2024年02月12日
    浏览(41)
  • vscode stm32cubemx 优雅开发stm32,最简单步骤教程

    下载STM32cubeMX,这个大家可以自己在stm的官网下载到 下载VSCode 下载arm-none-eabi-gcc 下载MinGW-w64,为了实现里面的makefile 等功能 下载OpenOCD这里用来调试stm32,支持jlink stlink daplink 上述安装步骤1、安装步骤2在这里比较简单,就不赘述了  此处我们下载zip包,方便安装。 以我为例

    2024年02月12日
    浏览(55)
  • 瑞芯微RK3588开发板的固件烧录完整教程(简单好上手)

    ​​​​​​​本期技术干货内容分享嵌入式开发板固件烧录教程,以英码嵌入式开发板EVM3588为例,该发板搭载的是瑞芯微RK3588平台,烧录方式采用最常用的USB_OTG烧录,简单又方便! 开发环境 主机:Ubuntu 20.04 开发板:英码科技EVM3588开发板 烧录工具:RKDevTool_Release_v2.92.zi

    2024年02月11日
    浏览(44)
  • Kafka:springboot集成kafka收发消息

    kafka环境搭建参考Kafka:安装和配置_moreCalm的博客-CSDN博客 1、springboot中引入kafka依赖 2、配置application.yml 传递String类型的消息 3、controller实现消息发送接口 4、component中实现接收类HelloListener  5、测试 浏览器访问该接口并查看控制台         接收成功   传递对象类型的消息

    2024年02月13日
    浏览(41)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包