【SpringBoot系列】SpringBoot整合Kafka(含源码)

这篇具有很好参考价值的文章主要介绍了【SpringBoot系列】SpringBoot整合Kafka(含源码)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


【SpringBoot系列】SpringBoot整合Kafka(含源码),SpringBoot,中间件,spring boot,kafka,后端

前言

在现代的微服务架构中,消息队列已经成为了一个不可或缺的组件。

它能够帮助我们在不同的服务之间传递消息,并且能够确保这些消息不会丢失。

在众多的消息队列中,Kafka 是一个非常出色的选择。

它能够处理大量的实时数据,并且提供了强大的持久化能力。

在本文中,我们将会探讨如何在 SpringBoot 中整合 Kafka。


什么是Kafka?

Apache Kafka 是一个开源的流处理平台,由 LinkedIn 团队开发并于 2011 年贡献给 Apache 基金会。Kafka 以其高吞吐量、可扩展性和容错性而闻名。它是一个基于发布/订阅模式的消息系统,通常用于大型实时数据流处理应用。

Kafka 的主要组件包括:

  • Producer:负责发布消息到 Kafka 服务器。
  • Broker:是 Kafka 服务器实例,负责消息的存储、接收和发送。
  • Consumer:从 Kafka 服务器读取消息。
  • Topic:消息的类别或者说是消息的标签,Producer 将消息发布到特定的 Topic,Consumer 从特定的 Topic 读取消息。

Kafka 可以在分布式系统中用于构建实时流数据管道,它可以在系统或应用之间可靠地获取数据。此外,Kafka 可以和 Apache Storm、Apache Hadoop、Apache Spark 等进行集成,用于大数据处理和分析。


Kafka的应用场景?

日志收集:

一个公司可能有很多服务器,每个服务器上运行着很多服务,Kafka 可以用来实现这些服务的日志收集功能。各服务的日志分别发送到 Kafka 的不同 Topic 中。

消息系统:

Kafka 能够作为一个大规模的消息处理系统,各生产者将消息发送到 Kafka,消费者从 Kafka 中读取消息进行处理。

用户活动跟踪:

Kafka 也常用于用户活动跟踪和实时分析。例如,用户的点击、搜索等行为可以实时写入到 Kafka,然后进行实时或者离线分析。

在 Kafka 上可以进行实时的流处理。例如,使用 Apache Storm 集成 Kafka 来进行实时的数据处理。

指标和日志聚合:

统计数据和监控数据也是 Kafka 的一个重要应用场景。例如,通过 Kafka 可以收集各种分布式应用的数据,然后进行统一的处理和分析。

事件源:

Kafka 可以作为大规模事件处理的源头,例如,用户的行为、系统的状态等都可以作为事件,通过 Kafka 进行分发处理。


示例

版本依赖
模块 版本
SpringBoot 3.1.0
JDK 17
代码
KafkaConfig
@Configuration
@EnableKafka
public class KafkaConfig {

    @Bean
    public KafkaReceiver listener() {
        return new KafkaReceiver();
    }

}
KafkaSender
@Component
@Slf4j
public class KafkaSender {
    @Resource
    private KafkaTemplate<String, Object> kafkaTemplate;

    public void send(String topic, String key, String data) {
        //发送消息
        CompletableFuture<SendResult<String, Object>> completable = kafkaTemplate.send(topic, key, data);
        completable.whenCompleteAsync((result, ex) -> {
            if (null == ex) {
                log.info(topic + "生产者发送消息成功:" + result.toString());
            } else {
                log.info(topic + "生产者发送消息失败:" + ex.getMessage());
            }
        });
    }
}
KafkaReceiver
@Component
@Slf4j
public class KafkaReceiver {
    /**
     * 下面的主题是一个数组,可以同时订阅多主题,只需按数组格式即可,也就是用","隔开
     */
    @KafkaListener(topics = {"testTopic"})
    public void receive(ConsumerRecord<?, ?> record){
        log.info("消费者收到的消息key: " + record.key());
        log.info("消费者收到的消息value: " + record.value().toString());
    }
}
KafkaController
/**
 * kafka 测试接口
 */
@RestController
public class KafkaController {
    @Autowired
    private KafkaSender kafkaSender;

    @GetMapping("/sendMessageToKafka")
    public String sendMessageToKafka() {
        Map<String, String> messageMap = new HashMap();
        messageMap.put("message", "hello world!");
        ObjectMapper objectMapper = new ObjectMapper();
        String data = null;
        try {
            data = objectMapper.writeValueAsString(messageMap);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
        String key = String.valueOf(UUID.randomUUID());
        //kakfa的推送消息方法有多种,可以采取带有任务key的,也可以采取不带有的(不带时默认为null)
        kafkaSender.send("testTopic", key, data);
        return "ok";
    }
}
测试

http://127.0.0.1:8080/sendMessageToKafka

【SpringBoot系列】SpringBoot整合Kafka(含源码),SpringBoot,中间件,spring boot,kafka,后端


【SpringBoot系列】SpringBoot整合Kafka(含源码),SpringBoot,中间件,spring boot,kafka,后端


遇见问题

Error connecting to node xxxxxx:9092 (id: 0 rack: null)

Error connecting to node iZbp127a9vpra4v3kmkkmzZ:9092 (id: 0 rack: null)

解决方案

修改本地物理机hosts文件。文件目录:C:\Windows\System32\drivers\etc

【SpringBoot系列】SpringBoot整合Kafka(含源码),SpringBoot,中间件,spring boot,kafka,后端

新增 xx.xx.xx.xx iZbp127a9vpra4v3kmkkmzZ

如果没生效,则需要重启系统


总结

通过上述的步骤,我们已经成功地在 SpringBoot 中整合了 Kafka。

这使得我们的应用程序能够在不同的服务之间传递消息,而不需要担心消息的丢失。

我们也看到,通过使用 SpringBoot,我们可以非常轻松地完成这个过程。

希望这篇文章能够帮助你在自己的项目中更好地使用 Kafka。


源码获取

如果需要完整源码请关注公众号"架构殿堂" ,回复 "SpringBoot+Kafka"即可获得


写在最后

感谢您的支持和鼓励! 😊🙏

如果大家对相关文章感兴趣,可以关注公众号"架构殿堂",会持续更新AIGC,java基础面试题, netty, spring boot, spring cloud等系列文章,一系列干货随时送达!

【SpringBoot系列】SpringBoot整合Kafka(含源码),SpringBoot,中间件,spring boot,kafka,后端文章来源地址https://www.toymoban.com/news/detail-752770.html

到了这里,关于【SpringBoot系列】SpringBoot整合Kafka(含源码)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Springboot整合activiti5,达梦数据库,mybatis中间件

    由于工作流引擎不支持达梦数据库以及国产中间件,所以我们引入的时候会报错,这个时候就需要去改造代码和配置文件。各种文档和资料查找一天,现在对这个问题进行解决了。 1.查看网上的各类教程,手动将源码复制粘贴出来,进行修改。这方面可以自行去查找对应文档

    2024年02月14日
    浏览(40)
  • 【分布式技术专题】「OSS中间件系列」Minio的文件服务的存储模型及整合Java客户端访问的实战指南

    Minio的元数据 数据存储 MinIO对象存储系统没有元数据数据库,所有的操作都是对象级别的粒度的,这种做法的优势是: 个别对象的失效,不会溢出为更大级别的系统失效。 便于实现\\\"强一致性\\\"这个特性。此特性对于机器学习与大数据处理非常重要。 数据管理 元数据与数据一起

    2024年02月11日
    浏览(59)
  • 【中间件】消息中间件之Kafka

    一、概念介绍 Apache Kafka是一个分布式流处理平台,用于构建实时数据管道和流应用。它可以处理网站、应用或其他来源产生的大量数据流,并能实时地将这些数据流传输到另一个系统或应用中进行处理。 核心概念: Topic(主题) :消息的分类,用于区分不同的业务消息。

    2024年01月20日
    浏览(67)
  • 中间件 kafka

    Kafka(Apache Kafka)是一个非常流行的开源分布式流数据平台。它最初由LinkedIn开发,后来捐赠给了Apache基金会,并成为顶级项目。Kafka被设计用于处理实时数据流,具有高吞吐量、可扩展性和持久性。 Kafka 的主要特点和用途包括: 发布-订阅模型: Kafka 提供了一种发布-订阅(

    2024年02月13日
    浏览(50)
  • 大数据中间件——Kafka

    Kafka安装配置 首先我们把kafka的安装包上传到虚拟机中: 解压到对应的目录并修改对应的文件名: 首先我们来到kafka的config目录,我们第一个要修改的文件就是server.properties文件,修改内容如下: 主要修改三个部分,一个是唯一标识id,kafka的文件存储路径,一个是zookeeper的节

    2024年02月07日
    浏览(46)
  • 消息中间件 —— 初识Kafka

    1.1.1、为什么要有消息队列? 1.1.2、消息队列 消息 Message 网络中的两台计算机或者两个通讯设备之间传递的数据。例如说:文本、音乐、视频等内容。 队列 Queue 一种特殊的线性表(数据元素首尾相接),特殊之处在于只允许在首部删除元素和在尾部追加元素(FIFO)。 入队、出

    2024年02月13日
    浏览(53)
  • 中间件: Kafka安装部署

    下载二进制包 修改配置 启动 按照单机部署方式启动多个Zookeeper与broker节点。 修改config/server.properties配置: broker.id 每个节点唯一 zookeeper.connect: 改成zookeeper节点 查看集群状态:

    2024年02月12日
    浏览(44)
  • 消息中间件(二)——kafka

    在大数据中,会使用到大量的数据。面对这些海量的数据,我们一是需要做到能够 收集 这些数据,其次是要能够 分析和处理 这些海量数据。在此过程中,需要一套消息系统。 Kafka专门为分 布式高吞吐量 系统设计。作为一个消息代理的替代品,Kafka往往做的比其他消息中间

    2024年02月07日
    浏览(58)
  • 中间件(三)- Kafka(二)

    6.1 Kafka的高效读写 顺序写磁盘 Kafka的producer生产数据,需要写入到log文件中,写的过程是追加到文件末端,顺序写的方式,官网有数据表明,同样的磁盘,顺序写能够到600M/s,而随机写只有200K/s,这与磁盘的机械结构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时

    2024年02月07日
    浏览(39)
  • 消息中间件之Kafka(一)

    高性能的消息中间件,在大数据的业务场景下性能比较好,kafka本身不维护消息位点,而是交由Consumer来维护,消息可以重复消费,并且内部使用了零拷贝技术,性能比较好 Broker持久化消息时采用了MMAP的技术,Consumer拉取消息时使用的sendfile技术 Kafka是最初由Linkedin公司开发,

    2024年01月20日
    浏览(52)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包