kafka:java集成 kafka(springboot集成、客户端集成)

这篇具有很好参考价值的文章主要介绍了kafka:java集成 kafka(springboot集成、客户端集成)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

摘要

对于java的kafka集成,一般选用springboot集成kafka,但可能由于对接方kafka老旧、kafka不安全等问题导致kafak版本与spring版本不兼容,这个时候就得自己根据kafka客户端api集成了。

一、springboot集成kafka

具体官方文档地址:https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/

kafka:java集成 kafka(springboot集成、客户端集成)

1、加入依赖,spring-boot-starter-web和spring-kafka 的版本号可以看它们依赖的spring版本是否一致,这里pom依赖如下:

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.7.9</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.9.6</version>
        </dependency>

2、添加application.yml配置,具体如下:

server:
  port: 8087
spring:
  mvc:
    pathmatch:
      matching-strategy: ant_path_matcher
  kafka:
    bootstrap-servers: 192.168.189.128:9092,92.168.189.128:9093,192.168.189.128:9094
    consumer:
      properties:
        group:
          id: boot-kafka

3、发送消息,由于KafkaTemplate是自动装配的,所以只要在spring的bean里注入KafkaTemplate发送消息即可,具体如下:

package com.longqi.bootkafka.controller;

import com.longqi.bootkafka.entity.MessageParam;
import com.longqi.bootkafka.entity.Wrapper;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.validation.Valid;

/**
 * <p>
 * 测试 前端控制器
 * </p>
 * @author LongQi
 * @since 2021-06-23
 */

@Slf4j
@RestController
@RequestMapping("/test")
@Api(value = "TestController", tags = {"测试 API"})
public class TestController {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    private Boolean isSend = true;

    @PostMapping("/kafka/sendMessage")
    @ApiOperation(httpMethod = "POST", value = "发送kafka告警消息", response = Wrapper.class)
    public Wrapper sendKafkaMessage(@Valid @ApiParam("参数") @RequestBody MessageParam param) {
        kafkaTemplate.send(param.getTopic(), param.getMessage());
        return Wrapper.ok(true);
    }

}

这里用参数{"message": "asd54a6d46a4ds","topic": "device-alarm-test"}进行测试,会报如下日志:

kafka:java集成 kafka(springboot集成、客户端集成)

发现会报警告:[Producer clientId=producer-1] Error while fetching metadata with correlation id 34 : {device-alarm-test=LEADER_NOT_AVAILABLE},获取主题元数据错误,这个可以忽略,查找元数据失败,kafka默认会自动创建主题的,后续再次发送消息,是不会报这个错误的。

查看可视化工具EFAK,发现主题device-alarm-test是自动创建成功,分区数是kafka的集群配置service.properties里配置的分区9,具体如下:

kafka:java集成 kafka(springboot集成、客户端集成)
kafka:java集成 kafka(springboot集成、客户端集成)

可以看到,其中一个分区保存了这个消息,logsize变成了1,说明这个消息是发送成功的。另外也可以看到主题的各分区主备消息所在的节点是不一样的。

4、接收消息,接收消息也很简单,只要在spring的bean里使用KafkaListener注解即可,具体如下:

kafka:java集成 kafka(springboot集成、客户端集成)

可视化工具也能看到该主题该消费者9个分区的消费情况,具体如下:

kafka:java集成 kafka(springboot集成、客户端集成)

logSize为存入分区parttion消息数量,Offset为消费的偏移量(已消费的数量),Lag为未消费的数量(积压的数量),Owner为消费者,目前可以看到消费者为同一个,即只有1个线程在消费这9个分区的消息。

二、客户端集成kafka

直接使用kafka客户端,建议使用最新版的客户端,毕竟没有其他框架版本限制,能用最新的就用最新的,毕竟新的一般性能强也修复了bug。好比23年2月份出现的kafka安全漏洞:远程代码执行漏洞CVE-2023-25194,对现在最新版3.4.0无效,对以前大部分版本就有效。

1、添加依赖,具体如下:

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.4.0</version>
        </dependency>

2、发送和消费消息,具体代码如下:

package com.longqi.bootkafka.config;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

/**
 * @author LongQi
 * @projectName boot-integration
 * @description: kafka配置
 * @date 2023/3/13 14:42
 */

public class KafkaConfig {

    public static void main(String[] args) {
        // 声明主题
        String topic = "device-alarm-test";
        // 创建消费者
        Properties consumerConfig = new Properties();
        consumerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.189.128:9092,92.168.189.128:9093,192.168.189.128:9094");
        consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"boot-kafka");
        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer kafkaConsumer = new KafkaConsumer(consumerConfig);
        // 订阅主题并循环拉取消息
        kafkaConsumer.subscribe(Arrays.asList(topic));
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true){
                    ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(10000));
                    for(ConsumerRecord<String, String> record:records){
                        System.out.println(record.value());
                    }
                }
            }
        }).start();
        // 创建生产者
        Properties producerConfig = new Properties();
        producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.189.128:9092,92.168.189.128:9093,192.168.189.128:9094");
        producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG,"boot-kafka-client");
        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer producer = new KafkaProducer<>(producerConfig);
        // 给主题发送消息
        producer.send(new ProducerRecord<>(topic, "hello,"+System.currentTimeMillis()));
    }
}

最后可以看到打印消息如下:

kafka:java集成 kafka(springboot集成、客户端集成)

成功接收到消息并打印文章来源地址https://www.toymoban.com/news/detail-421243.html

到了这里,关于kafka:java集成 kafka(springboot集成、客户端集成)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • SpringBoot集成WebSocket实现客户端与服务端通信

    话不多说,直接上代码看效果! 一、服务端: 1、引用依赖 2、添加配置文件 WebSocketConfig 3、编写WebSocket服务端接收、发送功能   声明接口代码:   实现类代码: 4、如果不需要实现客户端功能,此处可选择前端调用,奉上代码 二、客户端: 1、引用依赖 2、自定义WebSocket客

    2024年01月23日
    浏览(54)
  • SpringBoot集成Elasticsearch客户端(新旧版本)(2023-01-28)

    第一章 SpringBoot集成ElasticSearch(2023-01-28) 例如:业务中需要使用es,所以做一些客户端选型,熟悉一下基本的操作,所以记录这篇博客,有关概念理论性的文章还在整理过程中,后续会整理个系列 Spring认证中国教育管理中心-Spring Data Elasticsearch教程一 SpringData集成Elasticsearch Sp

    2024年02月07日
    浏览(74)
  • SpringBoot集成WebSocket实现客户端与服务端长连接通信

    场景: 1、WebSocket协议是用于前后端长连接交互的技术,此技术多用于交互不断开的场景。特点是连接不间断、更轻量,只有在关闭浏览器窗口、或者关闭浏览器、或主动close,当前会话对象才会关闭。 2、相较于 Http/Https 通信只能由客户端主动发起请求,而 Socket 通信不仅能

    2024年02月02日
    浏览(59)
  • Springboot 集成WebSocket作为客户端,含重连接功能,开箱即用

    使用演示 只需要init后调用sendMessage方法即可,做到开箱即用。内部封装了失败重连接、断线重连接等功能。 基于Springboot工程 引入websocket依赖 开箱即用的工具类

    2024年02月04日
    浏览(59)
  • kafka之java客户端实战

            Kafka提供了两套客户端API, HighLevel API和LowLevel API 。 HighLevel API封装了kafka的运行细节,使用起来比较简单,是企业开发过程中最常用的客户端API。 而LowLevel API则需要客户端自己管理Kafka的运行细节,Partition,Offset这些数据都由客户端自行管理。这层API功能更灵活,

    2024年01月17日
    浏览(61)
  • springboot集成webstock实战:服务端数据推送数据到客户端实现实时刷新

        之前介绍过springboot集成webstock方式,具体参考: springboot集成websocket实战:站内消息实时推送 这里补充另外一个使用webstock的场景,方便其他同学理解和使用,废话不多说了,直接开始!简单介绍一下业务场景:     现在有一个投票活动,活动详情中会显示投票活动的参与人数、访

    2024年02月08日
    浏览(98)
  • SpringBoot集成Milo库实现OPC UA客户端:连接、遍历节点、读取、写入、订阅与批量订阅

    前面我们搭建了一个本地的 PLC 仿真环境,并通过 KEPServerEX6 读取 PLC 上的数据,最后还使用 UAExpert 作为OPC客户端完成从 KEPServerEX6 这个OPC服务器的数据读取与订阅功能。在这篇文章中,我们将通过 SpringBoot 集成 Milo 库实现一个 OPC UA 客户端,包括连接、遍历节点、读取、写入

    2024年02月09日
    浏览(63)
  • 使用Kafka客户端(kafka-clients)的Java API操作Kafka的Topic

    记录 :460 场景 :在Spring Boot微服务集成Kafka客户端kafka-clients-3.0.0操作Kafka的Topic的创建和删除。 版本 :JDK 1.8,Spring Boot 2.6.3,kafka_2.12-2.8.0,kafka-clients-3.0.0。 Kafka安装 :https://blog.csdn.net/zhangbeizhen18/article/details/129071395 1.微服务中 配置Kafka信息 1.1在pom.xml添加依赖 pom.xml文件: 解析

    2024年02月09日
    浏览(74)
  • 使用Kafka客户端(spring-kafka)的Java API操作Kafka的Topic

    记录 :458 场景 :在Spring Boot微服务集成Kafka客户端spring-kafka-2.8.2操作Kafka的Topic的创建和删除。 版本 :JDK 1.8,Spring Boot 2.6.3,kafka_2.12-2.8.0,spring-kafka-2.8.2。 Kafka安装 :https://blog.csdn.net/zhangbeizhen18/article/details/129071395 1.微服务中 配置Kafka信息 1.1在pom.xml添加依赖 pom.xml文件: 解析

    2024年02月09日
    浏览(44)
  • [Kafka集群] 配置支持Brokers内部SSL认证\外部客户端支持SASL_SSL认证并集成spring-cloud-starter-bus-kafka

    目录 Kafka 集群配置 准备 配置流程 Jaas(Java Authentication and Authorization Service )文件 zookeeper 配置文件 SSL自签名 启动zookeeper集群 启动kafka集群  spring-cloud-starter-bus-kafka 集成 下载统一版本Kafka服务包至三台不同的服务器上 文章使用版本为  kafka_2.13-3.5.0.tgz 下载地址 jdk版本 为 Ado

    2024年02月04日
    浏览(55)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包