Kafka是什么,以及如何使用SpringBoot对接Kafka

这篇具有很好参考价值的文章主要介绍了Kafka是什么,以及如何使用SpringBoot对接Kafka。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

系列文章目录

上手第一关,手把手教你安装kafka与可视化工具kafka-eagle
架构必备能力——kafka的选型对比及应用场景
Kafka存取原理与实现分析,打破面试难关
防止消息丢失与消息重复——Kafka可靠性分析及优化实践



kafka对接,kafka,kafka,spring boot,分布式,KafkaTemplate,kafka实践
继上一次教大家手把手安装kafka后,今天我们直接来到入门实操教程,也就是使用SpringBoot该怎么对接和使用kafka。当然,在一开始我们也会比较细致的介绍一下kafka本身。那么话不多说,马上开始今天的学习吧

📕作者简介:战斧,从事金融IT行业,有着多年一线开发、架构经验;爱好广泛,乐于分享,致力于创作更多高质量内容
📗本文收录于 kafka 专栏,有需要者,可直接订阅专栏实时获取更新
📘高质量专栏 云原生、RabbitMQ、Spring全家桶 等仍在更新,欢迎指导
📙Zookeeper Redis dubbo docker netty等诸多框架,以及架构与分布式专题即将上线,敬请期待

一、Kafka与流处理

我们先来看看比较正式的介绍:Kafka是一种流处理平台,由LinkedIn公司创建,现在是Apache下的开源项目。Kafka通过发布/订阅机制实现消息的异步传输和处理。它具有高吞吐量、低延迟、可伸缩性和可靠性等优点,使其成为了流处理和实时数据管道的首选解决方案

介绍其实是比较清晰的,如果你是第一次接触“流处理”概念,我们也可以做一点解释,流处理指的是对连续、实时产生的数据流进行实时处理、计算和分析的过程。

假设你正在玩一款在线游戏,其他玩家的动作和游戏事件会实时地传到服务器上。这些事件就形成了一条数据流。在流处理中,我们会对这条数据流进行实时处理,例如计算每个玩家的分数、监控游戏区域内的异常情况、统计玩家在线时长等等。这样,游戏管理员就可以实时地监控和管理游戏,而不需要等到游戏结束才进行操作。
类似的,流处理还可以应用在其他实时性要求比较高的场景中,例如金融交易、物联网、实时监测等。通过对数据流进行实时处理,我们可以更加精准地掌握数据变化的情况,并及时做出反应和调整,

二、Spring Boot与Kafka的整合Demo

1. 新建springboot工程

如果你没有现成的Spring boot项目,那么我们可以使用IDEA自带的Spring Initializr 来创建一个spring-boot的项目

kafka对接,kafka,kafka,spring boot,分布式,KafkaTemplate,kafka实践

此时我们可以直接选择使用Apache Kafka,另外项目还可以加个Spring Web准备让前台调用

kafka对接,kafka,kafka,spring boot,分布式,KafkaTemplate,kafka实践

2. 添加Kafka依赖

如果你不是像上述一样新建的项目,那你也可以选择在已有的Spring Boot应用程序中使用Kafka,那么你需要在pom.xml文件中添加以下依赖:

<dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
      <version>2.8.11</version>
</dependency>

3. 配置Kafka

在application.properties文件中添加以下配置:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=test_group

这里我们指定了Kafka服务器的地址和端口,并配置了消费者组的ID,关于消费者组的概念,其实就是某一些消费者具备相同的功能,因此会把他们设为同一个消费者组,这样他们就不会重复消费同一条消息了。更具体地原理,我们会在之后地篇章中介绍。

4. 创建Kafka生产者

在Kafka中,生产者是发送消息的应用程序或服务。在Spring Boot中,我们可以使用KafkaTemplate类来创建Kafka生产者

package com.zhanfu.kafkademo.service;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaService {
    
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String message) {
        kafkaTemplate.send("test_topic", message);
    }
}

这里我们使用@Autowired注解来自动注入KafkaTemplate,并使用send方法将消息发送到名为“test_topic”的Kafka主题中。


5. 创建Kafka消费者

在Kafka中,消费者是接收并处理订阅主题消息的应用程序或服务。在Spring Boot中,我们可以使用@KafkaListener注解来创建Kafka消费者。

package com.zhanfu.kafkademo.listener;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class KafkaLis {

    @KafkaListener(topics = "test_topic", groupId = "test_group")
    public void receiveMessage(String message) {
        System.out.println("Received message: " + message);
    }
}

6. 应用程序入口

现在我们已经完成了Spring Boot和Kafka的整合。我们可以启动Spring Boot应用程序,然后发送消息并消费它,以测试我们的应用程序是否正确地与Kafka集成。

package com.zhanfu.kafkademo.controller;

import com.zhanfu.kafkademo.service.KafkaService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class MessageController {

    @Autowired
    private KafkaService kafkaService;

    @GetMapping("/send/{message}")
    public String sendMessage(@PathVariable String message) {
        kafkaService.sendMessage(message);
        return "Message sent successfully";
    }
}

在这个例子中,我们使用@Autowired注解来自动注入KafkaProducer,并通过发送消息的方法来调用sendMessage方法。最终项目整体框架如图:

kafka对接,kafka,kafka,spring boot,分布式,KafkaTemplate,kafka实践

三、启动与验证

首先自然是启动 Kafka ,怎么启动可参考 《上手第一关,手把手教你安装kafka与可视化工具kafka-eagle》,然后是启动我们的Spring Boot项目

kafka对接,kafka,kafka,spring boot,分布式,KafkaTemplate,kafka实践

然后在浏览器中输入

http://127.0.0.1:8080/send/hello

kafka对接,kafka,kafka,spring boot,分布式,KafkaTemplate,kafka实践

最后检查我们的项目日志:

kafka对接,kafka,kafka,spring boot,分布式,KafkaTemplate,kafka实践

可以看到,整个发送和接收的流程都走通了

四、KafkaTemplate 介绍

不难看出,在Springboot中,使用kafka的关键在于 KafkaTemplate, 它是 Spring 提供的 Kafka 生产者模版,用于向 Kafka 集群发送消息。并且把 Kafka 的生产者客户端封装成了一个 Spring Bean,提供更加方便易用的 API。

它有三个主要属性:

  • producerFactory:生产者工厂类,用于创建 KafkaProducer 实例。
  • defaultTopic:默认主题名称,如果在发送消息时没有指定主题名称,则使用该默认主题。
  • messageConverter:消息转换器,用于将消息对象转换为 Kafka ProducerRecord

它的主要方法:

  • send(ProducerRecord<K,V> record):向指定的 Kafka 主题发送一条消息。ProducerRecord 包含了主题名称、分区编号、Key 和 Value 等信息。
  • send(String topic, V data):向指定的 Kafka 主题发送一条消息。
  • send(String topic, K key, V data):向指定的 Kafka 主题发送一条消息,并指定消息的 Key。
  • execute(ProducerCallback<K,V> callback):使用回调方式发送消息,可以自定义消息的创建过程和错误处理过程。
  • inTransaction():启用事务,多个 send 方法调用将被包装在一个事务中,保证 Kafka 事务的原子性。

除了上述方法外,KafkaTemplate 还提供了其他方法,如 sendDefault()sendOffsetsToTransaction() 等,可以根据实际需要进行选择和使用。

需要注意的是,在使用 KafkaTemplate 发送消息时应该注意消息的序列化方式、主题和分区的选择以及错误处理等问题,以保证消息的可靠性和正确性。

当然,很多同学可能还注意到一个细节,我们在上面的Demo中,我们直接将其 @Autowired进我们的代码中,这是怎么做到的呢?换句话说,这个 KafkaTemplate 为什么自己就会被spring 容器管理的呢?其实这得益于SpringBoot中对Kafka有了很多自动配置的内容。如下:

kafka对接,kafka,kafka,spring boot,分布式,KafkaTemplate,kafka实践
kafka对接,kafka,kafka,spring boot,分布式,KafkaTemplate,kafka实践

如上图,相信对Spring Boot熟悉的同学看到 ConditionalOnClassConditionalOnMissingBean 应该就明白了。其实Spring Boot 早就贴心的为我们预留了这些自动配置,只要我们引入了 spring-kafka 包,使得项目中出现了 KafkaTemplate 类,那么它就能被自动配置并存入Spring 容器内

总结

今天我们通过一个Demo讲解了在SpringBoot中如何对接Kafka,也介绍了下关键类 KafkaTemplate ,得益于Spring Boot 的自动配置,开发者要做的配置内容其实并不多,使用也主要是依赖其提供的API,相对简单,相信大家很容易也都学会了,那么在后面的过程中,我们将继续学习其使用,并且会着重讲解 Kafka 的原理与结构文章来源地址https://www.toymoban.com/news/detail-712792.html

到了这里,关于Kafka是什么,以及如何使用SpringBoot对接Kafka的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • SpringBoot对接阿里云OSS上传文件以及回调(有坑)

    今天在对接阿里云OSS对象存储, 把这过程记录下来 阿里云的内容很多,文档是真的难找又难懂 本文主要是用的PostObject API 加上 Callback参数 PostObject - https://help.aliyun.com/document_detail/31988.html?spm=a2c4g.31989.0.0 Callback - https://help.aliyun.com/document_detail/31989.html?spm=a2c4g.31988.0.0 前端向后

    2024年02月11日
    浏览(45)
  • Springboot实战14 消息驱动:如何使用 KafkaTemplate 集成 Kafka?

    从今天开始,我们将进入 Spring Boot 中另一个重要话题的讨论,即消息通信。 消息通信是 Web 应用程序中间层组件中的代表性技术体系,主要用于构建复杂而又灵活的业务流程。在互联网应用中,消息通信被认为是实现系统解耦和高并发的关键技术体系。本节课我们将在 Spri

    2024年02月04日
    浏览(27)
  • 什么是Flink CDC,以及如何使用

    数据库中的CDC(Change Data Capture,变更数据捕获)是一种用于实时跟踪数据库中数据变化的技术。CDC的主要目的是在数据库中捕获增量数据,以便在需要时可以轻松地将这些数据合并到其他系统或应用程序中。CDC在数据库管理、数据同步、数据集成和数据备份等方面具有广泛的

    2024年02月11日
    浏览(35)
  • 【JAVA】为什么要使用封装以及如何封装

    个人主页:【😊个人主页】 系列专栏:【❤️初识JAVA】 Java的封装指的是在一个类中将数据和方法进行封装,使其可以保护起来,只能在该类内部访问,而不允许外部直接访问和修改。这是Java面向对象编程的三个基本特性之一,另外两个是继承和多态。在此之前我们已经学

    2024年02月08日
    浏览(43)
  • 实战指南,SpringBoot + Mybatis 如何对接多数据源

    MyBatis缓存原理 Mybatis plugin 的使用及原理 MyBatis+Springboot 启动到SQL执行全流程 数据库操作不再困难,MyBatis动态Sql标签解析 从零开始,手把手教你搭建Spring Boot后台工程并说明 Spring框架与SpringBoot的关联与区别 Spring监听器用法与原理详解 Spring事务畅谈 —— 由浅入深彻底弄懂

    2024年02月12日
    浏览(39)
  • 【机器学习 | 深度学习】Colab是什么?以及如何使用它?

    Colaboratory(简称为Colab)是由Google开发的一种基于云端的交互式笔记本环境。它提供了免费的计算资源(包括CPU、GPU和TPU),可让用户在浏览器中编写和执行代码,而无需进行任何配置和安装。 Colab的目标是使机器学习和数据科学的工作更加便捷、灵活和可共享。 下面是Col

    2024年02月09日
    浏览(31)
  • 什么是OpenVino?以及如何使用OpenVino运行yolo

    目录 Openvino简介 如何使用它? 构建源代码 Openvino IR模型 第一个Openvino示例 C语言示例 C++示例 使用OpenVino跑Yolo模型 Openvino 是由 Intel 开发的专门用于优化和部署人工智能推理的半开源的工具包,主要用于对深度 推理做优化 。 Openvino内部集成了 Opencv 、 TensorFlow 模块,除此之外

    2023年04月26日
    浏览(47)
  • Logback日志框架使用详解以及如何Springboot快速集成

      日志系统是用于记录程序的运行过程中产生的运行信息、异常信息等,一般有8个级别,从低到高为All Trace Debug Info Warn Error Fatal OFF off 最高等级,用于关闭所有日志记录 fatal 指出每个严重的错误事件将会导致应用程序的退出。 error 指出虽然发生错误事件,但仍然不影响系统

    2024年02月07日
    浏览(31)
  • Spring Boot中的SimpMessagingTemplate是什么,原理,以及如何使用

    SimpMessagingTemplate是Spring Framework中的一个类,用于向WebSocket客户端发送消息。在Spring Boot应用程序中,可以使用SimpMessagingTemplate来实现WebSocket通信的消息发送功能。本文将介绍SimpMessagingTemplate的原理和使用方法。 SimpMessagingTemplate是Spring Framework中的一个类,用于向WebSocket客户端

    2024年02月09日
    浏览(38)
  • Spring Boot中的@EnableWebSocketMessageBroker注解是什么,原理,以及如何使用

    WebSocket是一种在Web浏览器和Web服务器之间进行双向通信的技术。在传统的HTTP通信中,客户端向服务器发送请求,服务器响应请求,然后关闭连接。而在WebSocket中,客户端和服务器之间的连接始终保持打开状态,可以随时互相发送消息,实现实时通信。 Spring Boot提供了对WebSo

    2024年02月12日
    浏览(59)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包