spring boot学习第八篇:kafka监听消费

这篇具有很好参考价值的文章主要介绍了spring boot学习第八篇:kafka监听消费。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

为了实现监听器功能

pom.xml文件内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.4</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.hmblogs</groupId>
    <artifactId>hmblogs</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>hmblogs</name>
    <description>hmblogs</description>
    <properties>
        <java.version>8</java.version>
        <druid.version>1.2.8</druid.version>
        <log4jdbc.version>1.16</log4jdbc.version>
    </properties>
    <dependencies>
        <!-- druid数据源驱动 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>${druid.version}</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!-- mybatis -->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>3.5.3.1</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <!--Mysql依赖包-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>

        <!--lombok插件-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <!--监控sql日志-->
        <dependency>
            <groupId>org.bgee.log4jdbc-log4j2</groupId>
            <artifactId>log4jdbc-log4j2-jdbc4.1</artifactId>
            <version>${log4jdbc.version}</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.9</version>
        </dependency>

        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.8.0</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.7.2</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

 application.yml文件内容如下:

server:
  port: 8081
  servlet.context-path: /

#配置数据源
spring:
  datasource:
    druid:
      db-type: com.alibaba.druid.pool.DruidDataSource
      driverClassName: net.sf.log4jdbc.sql.jdbcapi.DriverSpy
      url: jdbc:log4jdbc:mysql://${DB_HOST:localhost}:${DB_PORT:3306}/${DB_NAME:eladmin}?serverTimezone=Asia/Shanghai&characterEncoding=utf8&useSSL=false
      username: ${DB_USER:root}
      password: ${DB_PWD:123456}
  redis:
    host: localhost
    port: 6379
    password: heming
    database: 10

logback.xml文件内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true" scanPeriod="10 seconds">
    <!-- 日志级别从低到高分为TRACE < DEBUG < INFO < WARN < ERROR < FATAL,如果设置为WARN,则低于WARN的信息都不会输出 -->
    <!-- scan:当此属性设置为true时,配置文件如果发生改变,将会被重新加载,默认值为true -->
    <!-- scanPeriod:设置监测配置文件是否有修改的时间间隔,如果没有给出时间单位,默认单位是毫秒。当scan为true时,此属性生效。默认的时间间隔为1分钟。 -->
    <!-- debug:当此属性设置为true时,将打印出logback内部日志信息,实时查看logback运行状态。默认值为false。 -->
    <contextName>logback</contextName>
    <property name="log.path" value="logs"></property>
    <property name="Console_Pattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%logger{50}] - %msg%n"/>

    <appender name="Console" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <Pattern>${Console_Pattern}</Pattern>
            <!-- 设置字符集 -->
            <charset>UTF-8</charset>
        </encoder>
    </appender>

    <!-- 时间滚动输出 level为 INFO 日志 -->
    <appender name="RollingFileBackend" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>${log.path}/hmblogs.log</file>
        <encoder>
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level [%logger{50}] - %msg%n</pattern>
            <charset>UTF-8</charset>
        </encoder>
        <!-- 日志记录器的滚动策略,按日期,按大小记录 -->
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <!-- 每天日志归档路径以及格式 -->
            <fileNamePattern>${log.path}/hmblogs/log-hmblogs-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
            <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
                <maxFileSize>100MB</maxFileSize>
            </timeBasedFileNamingAndTriggeringPolicy>
            <!--日志文件保留天数-->
            <maxHistory>15</maxHistory>
        </rollingPolicy>
        <!-- 此日志文件只记录info级别的 -->
        <filter class="ch.qos.logback.classic.filter.LevelFilter">
            <level>Info</level>
            <onMatch>ACCEPT</onMatch>
            <onMismatch>DENY</onMismatch>
        </filter>
    </appender>

    <!--additivity:是否继承root节点,默认是true继承。默认情况下子Logger会继承父Logger的appender,
    也就是说子Logger会在父Logger的appender里输出。
    若是additivity设为false,则子Logger只会在自己的appender里输出,而不会在父Logger的appender里输出。-->
    <logger name="org.springframework" level="INFO" additivity="false">
        <appender-ref ref="Console"/>
        <appender-ref ref="RollingFileBackend"/>
    </logger>
    <logger name="org.mybatis" level="INFO"></logger>
    <logger name="org.hibernate.SQL" level="DEBUG"  additivity="false">
        <appender-ref ref="Console"/>
        <appender-ref ref="RollingFileBackend"/>
    </logger>
    <Logger name="org.apache.catalina" level="info"/>
    <Logger name="org.apache.tomcat.util" level="info"/>
    <!-- 从低到高为:All < Trace < Debug < Info < Warn < Error < Fatal < OFF-->
    <root level="Info">
        <appender-ref ref="Console"/>
        <appender-ref ref="RollingFileBackend"/>
    </root>

</configuration>

BackendApplication.java文件内容如下:

package com.hmblogs.backend;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class BackendApplication {

    public static void main(String[] args) {
        SpringApplication.run(BackendApplication.class, args);
    }

}

然后添加了kafkaConsumerListenerExample.java文件

package com.hmblogs.backend.util;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.Optional;

/**
 *
 * @description:  kafka 消费者
 * @copyright: @Copyright (c) 2022
 * @company: hmblogs
 * @author: heming
 * @version: 1.0.0
 * @createTime: 2024-01-18 8:31
 */
@Component
@Slf4j
public class kafkaConsumerListenerExample {

    @KafkaListener(topics = "test", groupId = "0")
    public void consume(ConsumerRecord<?, ?> record) {
        Optional<?> value = Optional.ofNullable(record.value());
        // 进行消息处理逻辑
        log.info("print message: " + value);
    }
}


发到服务器上,启动hmblogs报错,截图如下:

spring boot学习第八篇:kafka监听消费,Spring Boot,spring boot,学习,kafka

Caused by: java.lang.TypeNotPresentException: Type org.springframework.kafka.listener.CommonErrorHandler not present

java.lang.ClassNotFoundException: org.springframework.kafka.listener.CommonErrorHandler

网上搜索资料,大部分讲的都是包冲突,在本地启动也是报这样的错,如下所示:

spring boot学习第八篇:kafka监听消费,Spring Boot,spring boot,学习,kafka

spring boot学习第八篇:kafka监听消费,Spring Boot,spring boot,学习,kafka

网上CSDN提出问题求助大佬,大佬建议spring boot项目可以不用给kafka相关依赖指定版本,然后我尝试了一下,问题解决。pom.xml文件内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.4</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.hmblogs</groupId>
    <artifactId>hmblogs</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>hmblogs</name>
    <description>hmblogs</description>
    <properties>
        <java.version>8</java.version>
        <druid.version>1.2.8</druid.version>
        <log4jdbc.version>1.16</log4jdbc.version>
    </properties>
    <dependencies>
        <!-- druid数据源驱动 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>${druid.version}</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!-- mybatis -->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>3.5.3.1</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <!--Mysql依赖包-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>

        <!--lombok插件-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <!--监控sql日志-->
        <dependency>
            <groupId>org.bgee.log4jdbc-log4j2</groupId>
            <artifactId>log4jdbc-log4j2-jdbc4.1</artifactId>
            <version>${log4jdbc.version}</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.9</version>
        </dependency>

        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>

        </dependency>

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>

        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

 本地启动,没问题

spring boot学习第八篇:kafka监听消费,Spring Boot,spring boot,学习,kafka

然后发到我的云主机,启动OK

然后在kafka服务里发消息,参考spring boot学习第八篇:kafka-CSDN博客 

这篇博客的2.3,这里不再赘述。

发消息截图如下:

spring boot学习第八篇:kafka监听消费,Spring Boot,spring boot,学习,kafka

 然后查看hmblogs.log的日志

spring boot学习第八篇:kafka监听消费,Spring Boot,spring boot,学习,kafka

我是用tail -f hmblogs.log查看的。

spring boot学习第八篇:kafka监听消费,Spring Boot,spring boot,学习,kafka 问题解决了。kafka监听效果也实现了,即一旦有另外的人发消息到这个kafka服务的topic上,该监听器能监听到新发进来该topic的消息,并且消费。文章来源地址https://www.toymoban.com/news/detail-805431.html

到了这里,关于spring boot学习第八篇:kafka监听消费的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spring Boot 整合kafka消费模式AckMode以及手动消费 依赖管理

    在pom.xml文件中导入依赖 需要自己配置AckMode时候的配置 kafka支持的消费模式,设置在 AbstractMessageListenerContainer.AckMode 的枚举中,下面就介绍下各个模式的区别 AckMode模式 AckMode模式 作用 MANUAL 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment

    2024年02月16日
    浏览(45)
  • 【Spring进阶系列丨第八篇】Spring整合junit & 面向切面编程(AOP)详解

    ​ @ContextConfiguration注解需要注意的细节是: classes:指定的是主配置类的字节码 locations:指定xml文件的位置 ​ 首先来看一个问题,假如我现在有一个UserServiceImpl用户业务类,其中呢,有一个保存用户的方法,即: ​ 现在的需求是:我要在保存用户之前新增事务的功能,你

    2024年04月13日
    浏览(58)
  • Spring Boot中使用Kafka时遇到“构建Kafka消费者失败“的问题

    在使用Spring Boot开发应用程序时,集成Apache Kafka作为消息队列是一种常见的做法。然而,有时候在配置和使用Kafka时可能会遇到一些问题。本文将探讨在Spring Boot应用程序中使用Kafka时可能遇到的\\\"构建Kafka消费者失败\\\"错误,并提供解决方案。 错误描述: 当尝试构建Kafka消费者时

    2024年01月17日
    浏览(48)
  • Spring Boot 整合kafka:生产者ack机制和消费者AckMode消费模式、手动提交ACK

    Kafka 生产者的 ACK 机制指的是生产者在发送消息后,对消息副本的确认机制。ACK 机制可以帮助生产者确保消息被成功写入 Kafka 集群中的多个副本,并在需要时获取确认信息。 Kafka 提供了三种 ACK 机制的配置选项,分别是: acks=0:生产者在成功将消息发送到网络缓冲区后即视

    2024年02月04日
    浏览(54)
  • 第八篇——Kafka Streams源码解读

    作者:禅与计算机程序设计艺术 Kafka Streams是一个开源分布式流处理平台,它可以让你轻松处理实时数据流。通过Kafka Streams API可以轻松创建、部署和运行复杂的实时流处理应用程序。虽然Kafka Stream提供了许多高级功能,但其底层原理却十分简单易懂,在学习之余,我们还是需

    2024年02月07日
    浏览(42)
  • 【Spring Boot】Spring—加载监听器

    前几天的时候,项目里有一个需求,需要一个开关控制代码中是否执行一段逻辑,于是理所当然的在yml文件中配置了一个属性作为开关,再配合nacos就可以随时改变这个值达到我们的目的,yml文件中是这样写的: 程序中的代码也很简单,大致的逻辑就是下面这样,如果取到的

    2024年02月08日
    浏览(44)
  • Spring Boot 监听器详解

    Spring Boot 3.x系列文章 Spring Boot 2.7.8 中文参考指南(一) Spring Boot 2.7.8 中文参考指南(二)-Web Spring Boot 源码阅读初始化环境搭建 Spring Boot 框架整体启动流程详解 Spring Boot 系统初始化器详解 Spring Boot 监听器详解 通过前面的几篇文章,我们都能看到 SpringApplicationRunListener ,SpringApp

    2024年02月08日
    浏览(63)
  • 【Spring Boot】Spring Boot 集成 RocketMQ 实现简单的消息发送和消费

    本文主要有以下内容: 简单消息的发送 顺序消息的发送 RocketMQTemplate的API介绍 环境搭建: RocketMQ的安装教程:在官网上下载bin文件,解压到本地,并配置环境变量,如下图所示: 在 Spring boot 项目中引入 RocketMQ 依赖: 在application.yml增加相关配置: 在 Spring Boot 中使用RocketM

    2024年02月14日
    浏览(50)
  • spring集成kafka并对消息进行监听

    需要依赖zookeeper,需提前启动 在server.properties文件中配置kafka连接zookeeper相关信息 在zookeeper.properties中配置zookeeper所需配置 kafka本地安装启动 pom文件 生产配置 消费者配置 创建topic工具类 生产业务 消费业务 消息接收类 监听类 业务处理 异步 同步 ONEWAY kafka消息发送方式有同步

    2024年02月03日
    浏览(48)
  • Apche Kafka + Spring的消息监听容器

    消息的接收:可以通过配置MessageListenerContainer并提供消息侦听器或使用@KafkaListener注释来接收消息。本章我们主要说明通过配置MessageListenerContainer并提供消息侦听器的方式接收消息。 当使用消息监听容器时,就必须提供一个监听器来接收数据。目前有八个支持消息侦听器的接

    2024年02月12日
    浏览(37)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包