消息驱动:如何使用 KafkaTemplate 集成 Kafka?

这篇具有很好参考价值的文章主要介绍了消息驱动:如何使用 KafkaTemplate 集成 Kafka?。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

消息通信机制与 SpringCSS 案例

在引入消息通信机制及消息中间件之前,我们先来梳理下 SpringCSS 中的应用场景。

SpringCSS 案例中的消息通信场景

在 SpringCSS 案例中,一个用户的账户信息变动并不会太频繁。因为 account-service 和 customer-service 分别位于两个服务中,为了降低远程交互的成本,很多时候我们会想到先在 customer-service 本地存放一份用户账户的拷贝信息,并在客户工单生成过程时直接从本地数据库中获取用户账户。

在这样的设计和实现方式下,如果某个用户的账户信息发生变化,我们应该如何正确且高效地应对呢?此时消息驱动机制从系统扩展性角度为我们提供了一种很好的实现方案。

在用户账户信息变更时,account-service 首先会发送一个消息告知某个用户账户信息已经发生变化,然后通知所有对该消息感兴趣的服务。而在 SpringCSS 案例中,这个服务就是 customer-service,相当于是这个消息的订阅者和消费者。

通过这种方式,customer-service 就可以快速获取用户账户变更消息,从而正确且高效地处理本地的用户账户数据。

整个场景的示意图见下图:

kafka template,springboot,kafka,microsoft,数据库,spring,java,后端,spring boot

用户账户更新场景中的消息通信机制

上图中我们发现,消息通信机制使得我们不必花费太大代价即可实现整个交互过程,简单而方便。

消息通信机制简介

消息通信机制的整体工作流程如下图所示:

kafka template,springboot,kafka,microsoft,数据库,spring,java,后端,spring boot

消息通信机制示意图

上图中位于流程中间的就是各种消息中间件,消息中间件一般提供了消息的发送客户端和接收客户端组件,这些客户端组件会嵌入业务服务中。

消息的生产者负责产生消息,在实际业务中一般由业务系统充当生产者;而消息的消费者负责消费消息,在实际业务中一般是后台系统负责异步消费。

消息通信有两种基本模型,即发布-订阅(Pub-Sub)模型和点对点(Point to Point)模型,发布-订阅支持生产者消费者之间的一对多关系,而点对点模型中有且仅有一个消费者。

上述概念构成了消息通信系统最基本的模型,围绕这个模型,业界已经有了一些实现规范和工具,代表性的规范有 JMS 、AMQP ,以及它们的实现框架 ActiveMQ 和 RabbitMQ 等,而 Kafka 等工具并不遵循特定的规范,但也提供了消息通信的设计和实现方案。

本节课我们重点关注 Kafka,后续的两个课时中我们再分别介绍 ActiveMQ 和 RabbitMQ。

与前面介绍的 JdbcTemplate 和 RestTemplate 类似,Spring Boot 作为一款支持快速开发的集成性框架,同样提供了一批以 -Template 命名的模板工具类用于实现消息通信。对于 Kafka 而言,这个工具类就是 KafkaTemplate。

使用 KafkaTemplate 集成 Kafka

在讨论如何使用 KafkaTemplate 实现与 Kafka 之间的集成方法之前,我们先来简单了解 Kafka 的基本架构,再引出 Kafka 中的几个核心概念。

Kafka 基本架构

Kafka 基本架构参考下图,从中我们可以看到 Broker、Producer、Consumer、Push、Pull 等消息通信系统常见概念在 Kafka 中都有所体现,生产者使用 Push 模式将消息发布到 Broker,而消费者使用 Pull 模式从 Broker 订阅消息。

kafka template,springboot,kafka,microsoft,数据库,spring,java,后端,spring boot

Kafka 基本架构图

在上图中我们注意到,Kafka 架构图中还使用了 Zookeeper。

Zookeeper 中存储了 Kafka 的元数据及消费者消费偏移量(Offset),其作用在于实现 Broker 和消费者之间的负载均衡。因此,如果我们想要运行 Kafka,首先需要启动 Zookeeper,再启动 Kafka 服务器。

在 Kafka 中还存在 Topic 这么一个核心概念,它是 Kafka 数据写入操作的基本单元,每一个 Topic 可以存在多个副本(Replication)以确保其可用性。每条消息属于且仅属于一个 Topic,因此开发人员通过 Kafka 发送消息时,必须指定将该消息发布到哪个 Topic。同样,消费者订阅消息时,也必须指定订阅来自哪个 Topic 的信息。

另一方面,从组成结构上讲,一个 Topic 中又可以包含一个或多个分区(Partition),因此在创建 Topic 时我们可以指定 Partition 个数。

KafkaTemplate 是 Spring 中提供的基于 Kafka 完成消息通信的模板工具类,而要想使用这个模板工具类,我们必须在消息的发送者和消费者应用程序中都添加如下 Maven 依赖:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
使用 KafkaTemplate 发送消息

KafkaTemplate 提供了一系列 send 方法用来发送消息,典型的 send 方法定义如下代码所示:

@Override
public ListenableFuture<SendResult<K, V>> send(String topic, @Nullable V data) {
}

在上述方法实际传入了两个参数,一个是消息对应的 Topic,另一个是消息体的内容。通过该方法,我们就能完成最基本的消息发送过程。

请注意,在使用 Kafka 时,我们推荐事先创建好 Topic 供消息生产者和消费者使用, 通过命令行创建 Topic 的方法如下代码所示:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic springcss.account.topic

这里创建了一个名为“springcss.account.topic”的 Topic,并指定它的副本数量和分区数量都是 3。

事实上,我们在调用 KafkaTemplate 的 send 方法时,如果 Kafka 中不存在该方法中指定的 Topic,它就会自动创建一个新的 Topic。

另一方面,KafkaTemplate 也提供了一组 sendDefault 方法,它通过使用默认的 Topic 来发送消息,如下代码所示:

@Override
public ListenableFuture<SendResult<K, V>> sendDefault(V data) {
    return send(this.defaultTopic, data);
}

从代码中我们可以看到,在上述 sendDefault 方法内部中也是使用了 send 方法完成消息的发送过程。

那么,如何指定这里的 defaultTopic 呢?在 Spring Boot 中,我们可以使用如下配置项完成这个工作。

spring:
  kafka:
    bootstrap-servers:
    - localhost:9092
    template:
      default-topic: demo.topic

现在,我们已经了解了通过 KafkaTemplate 发送消息的实现方式,KafkaTemplate 高度抽象了消息的发送过程,整个过程非常简单。

接下来我们切换下视角,看看如何消费所发送的消息。

使用 @KafkaListener 注解消费消息

首先需要强调一点,通过翻阅 KafkaTemplate 提供的类定义,我们并未找到有关接收消息的任何方法,这实际上与 Kafka 的设计思想有很大关系。

这点也与本课程后续要介绍的 JmsTemplate 和 RabbitTemplate 存在很大区别,因为它们都提供了明确的 receive 方法来接收消息。

从前面提供的 Kafka 架构图中我们可以看出,在 Kafka 中消息通过服务器推送给各个消费者,而 Kafka 的消费者在消费消息时,需要提供一个监听器(Listener)对某个 Topic 实现监听,从而获取消息,这也是 Kafka 消费消息的唯一方式。

在 Spring 中提供了一个 @KafkaListener 注解实现监听器,该注解定义如下代码所示:

@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE })
@Retention(RetentionPolicy.RUNTIME)
@MessageMapping
@Documented
@Repeatable(KafkaListeners.class)
public @interface KafkaListener {
    String id() default "";
    String containerFactory() default "";
    //消息 Topic
    String[] topics() default {};
    //Topic 的模式匹配表达式
    String topicPattern() default "";
	//Topic 分区
    TopicPartition[] topicPartitions() default {};
    String containerGroup() default "";
    String errorHandler() default "";
	//消息分组 Id
    String groupId() default "";
    boolean idIsGroup() default true;
    String clientIdPrefix() default "";
    String beanRef() default "__listener";
}

上述代码中我们可以看到 @KafkaListener 的定义比较复杂,我把日常开发中常见的几个配置项做了注释。

在使用 @KafkaListener 时,最核心的操作是设置 Topic,而 Kafka 还提供了一个模式匹配表达式可以对目标 Topic 实现灵活设置。

在这里,我们有必要强调下 groupId 这个属性,这就涉及 Kafka 中另一个核心概念:消费者分组(Consumer Group)。

设计消费者组的目的是应对集群环境下的多服务实例问题。显然,如果采用发布-订阅模式会导致一个服务的不同实例可能会消费到同一条消息。

为了解决这个问题,Kafka 中提供了消费者组的概念。一旦我们使用了消费组,一条消息只能被同一个组中的某一个服务实例所消费。

消费者组的基本结构如下图所示:

kafka template,springboot,kafka,microsoft,数据库,spring,java,后端,spring boot

Kafka 消费者组示意图

使用 @KafkaListener 注解时,我们把它直接添加在处理消息的方法上即可,如下代码所示:

@KafkaListener(topics = “demo.topic”)
public void handlerEvent(DemoEvent event) {
    //TODO:添加消息处理逻辑
}

当然,我们还需要在消费者的配置文件中指定用于消息消费的配置项,如下代码所示:

spring:      
  kafka:
    bootstrap-servers:
    - localhost:9092
    template:
      default-topic: demo.topic
    consumer:
      group-id: demo.group

可以看到,这里除了指定 template.default-topic 配置项之外,还指定了 consumer. group-id 配置项来指定消费者分组信息。

在 SpringCSS 案例中集成 Kafka

介绍完 KakfaTemplate 的基本原理后,我们将在 SpringCSS 案例中引入 Kafka 实现 account-service 与 customer-service 之间的消息通信。

实现 account-service 消息生产者

首先,我们新建一个 Spring Boot 工程,用来保存用于多个服务之间交互的消息对象,以供各个服务使用。

我们将这个代码工程简单命名为 message,并添加一个代表消息主体的事件 AccountChangedEvent,如下代码所示:

package com.springcss.message;
 
public class AccountChangedEvent implements Serializable {
    //事件类型
    private String type;
    //事件所对应的操作(新增、更新和删除)
    private String operation;
    //事件对应的领域模型
    private AccountMessage accountMessage;
    //省略 getter/setter
}

上述 AccountChangedEvent 类包含了 AccountMessage 对象本身以及它的操作类型,而 AccountMessage 与 Account 对象的定义完全一致,只不过 AccountMessage 额外实现了用于序列化的 Serializable 接口而已,如下代码所示:

public class AccountMessage implements Serializable {
        
    private Long id;
    private String accountCode;    
    private String accountName;
}

定义完消息实体之后,我们在 account-service 中引用了一个 message 工程,并添加了一个 KafkaAccountChangedPublisher 类用来实现消息的发布,如下代码所示:

@Component("kafkaAccountChangedPublisher")
public class KafkaAccountChangedPublisher {
 
    @Autowired
    private KafkaTemplate<String, AccountChangedEvent> kafkaTemplate;
      
    @Override
    protected void publishEvent(AccountChangedEvent event) {
        kafkaTemplate.send(AccountChannels.SPRINGCSS_ACCOUNT_TOPIC, event);
    }
}

在这里可以看到,我们注入了一个 KafkaTemplate 对象,然后通过它的 send 方法向目标 Topic 发送了消息。

这里的 AccountChannels.SPRINGCSS_ACCOUNT_TOPIC 就是 "springcss.account.topic",我们需要在 account-service 中的配置文件中指定同一个 Topic,如下代码所示:

spring:
  kafka:
    bootstrap-servers:
    - localhost:9092
    template:
      default-topic: springcss.account.topic
    producer:
      keySerializer: org.springframework.kafka.support.serializer.JsonSerializer
      valueSerializer: org.springframework.kafka.support.serializer.JsonSerializer

注意到这里,我们使用了 JsonSerializer 对发送的消息进行序列化。

实现 customer-service 消息消费者

针对服务消费者 customer-service,我们先来看它的配置信息,如下代码所示:

spring:      
  kafka:
    bootstrap-servers:
    - localhost:9092
    template:
      default-topic: springcss.account.topic
    consumer:
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      group-id: springcss_customer
      properties:
        spring.json.trusted.packages: com.springcss.message

相较消息生产者中的配置信息,消息消费者的配置信息多了两个配置项,其中一个是 group-id,通过前面内容的介绍,我们已经知道这是 Kafka 消费者特有的一个配置项,用于指定消费者组。

而另一个配置项是 spring.json.trusted.packages,用于设置 JSON 序列化的可行包名称,这个名称需要与 AccountChangedEvent 类所在的包结构一致,即这里指定的 com.springcss.message。文章来源地址https://www.toymoban.com/news/detail-854119.html


到了这里,关于消息驱动:如何使用 KafkaTemplate 集成 Kafka?的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Kafka:springboot集成kafka收发消息

    kafka环境搭建参考Kafka:安装和配置_moreCalm的博客-CSDN博客 1、springboot中引入kafka依赖 2、配置application.yml 传递String类型的消息 3、controller实现消息发送接口 4、component中实现接收类HelloListener  5、测试 浏览器访问该接口并查看控制台         接收成功   传递对象类型的消息

    2024年02月13日
    浏览(33)
  • Java集成消息队列Kafka

    在使用Maven构建Java项目时,你可以通过添加Kafka的Maven依赖来引入Kafka相关的库。下面是Kafka的Maven坐标: 将上述依赖坐标添加到你的项目的pom.xml文件中,即可下载并引入Kafka客户端库。请注意,版本号可能会有所不同,你可以根据自己的需求选择最合适的版本。 另外,如果你

    2024年01月18日
    浏览(33)
  • Springboot实战16 消息驱动:如何使用 RabbitTemplate 集成 RabbitMQ?

    15 讲我们介绍了基于 ActiveMQ 和 JmsTemplate 实现消息发送和消费,并重构了 SpringCSS 案例系统中的 account-service 和 customer-service 服务。 今天,我们将介绍另一款主流的消息中间件 RabbitMQ,并基于 RabbitTemplate 模板工具类为 SpringCSS 案例添加对应的消息通信机制。 AMQP 规范与 RabbitM

    2024年02月02日
    浏览(36)
  • spring集成kafka并对消息进行监听

    需要依赖zookeeper,需提前启动 在server.properties文件中配置kafka连接zookeeper相关信息 在zookeeper.properties中配置zookeeper所需配置 kafka本地安装启动 pom文件 生产配置 消费者配置 创建topic工具类 生产业务 消费业务 消息接收类 监听类 业务处理 异步 同步 ONEWAY kafka消息发送方式有同步

    2024年02月03日
    浏览(31)
  • Java中如何使用消息队列实现异步(ActiveMQ,RabbitMQ,Kafka)

    在 Java 中,可以使用消息队列实现异步处理。下面是一个简单的示例代码,用于说明如何使用 ActiveMQ 实现消息队列异步处理: 添加 ActiveMQ 依赖 在 pom.xml 文件中添加以下依赖: 创建消息队列 创建一个名为 “TestQueue” 的消息队列,并配置 ActiveMQ 连接信息: 创建消息消费者

    2024年02月16日
    浏览(46)
  • 【Spring Boot】集成Kafka实现消息发送和订阅

    最近忙着搞低代码开发,好久没新建spring项目了,结果今天心血来潮准备建个springboot项目 注意Type选Maven,java选8,其他默认 点下一步后完成就新建了一个spring boot项目,配置下Maven环境,主要是settings.xml文件,里面要包含阿里云仓库,不然可能依赖下载不下来 在maven配置没问

    2024年02月09日
    浏览(32)
  • springboot集成kafka详细步骤(发送及监听消息示例)

    1、本机的kafka环境配置,不再赘述 2、添加 pom 文件 3、配置application.yml 4、复写kafka的相关配置类:生产、消费相关配置 5、生产、消费的伪代码 6、测试消息发送 经过测试!

    2024年02月11日
    浏览(26)
  • MQTT 与 Kafka|物联网消息与流数据集成实践

    MQTT (Message Queuing Telemetry Transport) 是一种轻量级的消息传输协议,专为受限网络环境下的设备通信而设计。Apache Kafka 是一个分布式流处理平台,旨在处理大规模的实时数据流。 Kafka 和 MQTT 是实现物联网数据端到端集成的互补技术。通过结合使用 Kafka 和 MQTT,企业可以构建一个

    2024年02月16日
    浏览(27)
  • [kafka]kafka如何保证消息有序

    严格的说,kafka只能保证同一个分区内的消息存储的有序性。 这个问题并没有标准答案,面试官只是想看看你如何思考的。 kafka只能保证单partition有序,如果kafka要保证多个partition有序,不仅broker保存的数据要保持顺序,消费时也要按序消费。假设partition1堵了,为了有序,那

    2024年02月16日
    浏览(27)
  • 实战:Spring Cloud Stream集成兼容多消息中间件kafka、rabbitmq

    前面的博文我们介绍并实战演示了Spring Cloud Stream整合rabbitmq,其中主要介绍了如何使用和配置完成消息中间件的集成。但是,在实际的生产环境中可能会用到多个消息中间件,又或者是由于业务改变需要更换消息中间件,在这些情况下我们的Spring Cloud Stream框架可以完全兼容多

    2024年02月08日
    浏览(39)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包