kafka 命令脚本说明以及在java中使用

这篇具有很好参考价值的文章主要介绍了kafka 命令脚本说明以及在java中使用。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、命令行使用

1.1、topic 命令

1、关于topic,这里用window 来示例

bin\windows\kafka-topics.bat

kafka 命令脚本说明以及在java中使用,MQ,kafka,java,spring boot

2、创建 first topic,五个分区,1个副本

bin\windows\kafka-topics.bat  --bootstrap-server localhost:9092 --create --partitions 5 --replication-factor 1 --topic first

kafka 命令脚本说明以及在java中使用,MQ,kafka,java,spring boot
3、查看当前服务器中的所有 topic

bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092

kafka 命令脚本说明以及在java中使用,MQ,kafka,java,spring boot

4、查看 first 主题的详情

bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --describe --topic first

kafka 命令脚本说明以及在java中使用,MQ,kafka,java,spring boot
5、修改分区数**(注意:分区数只能增加,不能减少)**

bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --alter --topic first --partitions 6

kafka 命令脚本说明以及在java中使用,MQ,kafka,java,spring boot

6、删除 topic,该操作在winodw,会出现文件授权问题,日志可以在kafka的启动命令窗口中查看,只需要修改文件权限即可,如果出现这个问题,我们需要清空之前配置的 datakafka-logs 这两个文件中的内容,再次重新启动即可。

bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --delete --topic first

kafka 命令脚本说明以及在java中使用,MQ,kafka,java,spring boot

1.2、生产者命令行操作

1、关于查看操作生产者命令参数,这里用window 来示例

.\bin\windows\kafka-console-producer.bat

kafka 命令脚本说明以及在java中使用,MQ,kafka,java,spring boot

2、发送消息,这里发送了2次的数据,第一次是hello,第二次是world

.\bin\windows\kafka-console-producer.bat --bootstrap-server localhost:9092 --topic first

kafka 命令脚本说明以及在java中使用,MQ,kafka,java,spring boot

1.3、消费者命令行操作

1、关于查看操作生产者命令参数,这里用window 来示例

.\bin\windows\kafka-console-consumer.bat

kafka 命令脚本说明以及在java中使用,MQ,kafka,java,spring boot
kafka 命令脚本说明以及在java中使用,MQ,kafka,java,spring boot

2、接受消息,因为前面我们在发送消息的时候,消费者没有启动,所以第一次发的数据这里是收不到的,并没有存储到topic中

.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic first

kafka 命令脚本说明以及在java中使用,MQ,kafka,java,spring boot

kafka 命令脚本说明以及在java中使用,MQ,kafka,java,spring boot

3、把主题中所有的数据都读取出来(包括历史数据),可以看到我们获取到了从消费者没有上线之前到上线之后的所有数据,一共6条。

.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --from-beginning --topic first

kafka 命令脚本说明以及在java中使用,MQ,kafka,java,spring boot

1.4、脚本说明

项目 Value
connect-standalone.sh 用于启动单节点的Standalone模式的Kafka Connect组件。
connect-distributed.sh 用于启动多节点的Distributed模式的Kafka Connect组件。
kafka-acls.sh 脚本用于设置Kafka权限,比如设置哪些用户可以访问Kafka的哪些TOPIC的权限。
kafka-delegation-tokens.sh 用于管理Delegation Token。基于Delegation Token的认证是一种轻量级的认证机制,是对SASL认证机制的补充。
kafka-topics.sh 用于管理所有TOPIC。
kafka-console-producer.sh 用于生产消息。
kafka-console-consumer.sh 用于消费消息。
kafka-producer-perf-test.sh 用于生产者性能测试
kafka-consumer-perf-test.sh 用于消费者性能测试
kafka-delete-records.sh 用于删除Kafka的分区消息,由于Kafka有自己的自动消息删除策略,使用率不高。
kafka-dump-log.sh 用于查看Kafka消息文件的内容,包括消息的各种元数据信息、消息体数据。
kafka-log-dirs.sh 用于查询各个Broker上的各个日志路径的磁盘占用情况。
kafka-mirror-maker.sh 用于在Kafka集群间实现数据镜像。
kafka-preferred-replica-election.sh 用于执行Preferred Leader选举,可以为指定的主题执行更换Leader的操作。
kafka-reassign-partitions.sh 用于执行分区副本迁移以及副本文件路径迁移。
kafka-run-class.sh 用于执行任何带main方法的Kafka类。
kafka-server-start.sh 用于启动Broker进程。
kafka-server-stop.sh 用于停止Broker进程。
kafka-streams-application-reset.sh 用于给Kafka Streams应用程序重设位移,以便重新消费数据。
kafka-verifiable-producer.sh 用于测试验证生产者的功能。
kafka-verifiable-consumer.sh 用于测试验证消费者功能。
trogdor.sh 是Kafka的测试框架,用于执行各种基准测试和负载测试。
kafka-broker-api-versions.sh 脚本主要用于验证不同Kafka版本之间服务器和客户端的适配性

1.5、关闭kafka

1、一定要先关闭 kafka,再关闭zookeeper,否则容易出现数据错乱

如果出现数据错错乱,最简单的方法就是清空data和kafka-logs 这两个文件下的内容,重新启动即可

2、关闭

.\bin\windows\kafka-server-stop.bat
.\bin\windows\zookeeper-server-stop.bat

kafka 命令脚本说明以及在java中使用,MQ,kafka,java,spring boot

1.6、选择分区数及kafka性能测试

1、主要工具是 kafka-producer-perf-test.batkafka-consumer-perf-test.bat 两个脚本,可以参考 kafka如何选择分区数及kafka性能测试

二、java 使用

2.1、使用原生客户端

1、依赖

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.4.0</version>
        </dependency>

2、发送和消费消息,具体代码如下:

public class KafkaConfig {
 
    public static void main(String[] args) {
        // 声明主题
        String topic = "first";
        // 创建消费者
        Properties consumerConfig = new Properties();
        consumerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.189.128:9092,92.168.189.128:9093,192.168.189.128:9094");
        consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"boot-kafka");
        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer kafkaConsumer = new KafkaConsumer(consumerConfig);
        // 订阅主题并循环拉取消息
        kafkaConsumer.subscribe(Arrays.asList(topic));
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true){
                    ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(10000));
                    for(ConsumerRecord<String, String> record:records){
                        System.out.println(record.value());
                    }
                }
            }
        }).start();
        // 创建生产者
        Properties producerConfig = new Properties();
        producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.189.128:9092,92.168.189.128:9093,192.168.189.128:9094");
        producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG,"boot-kafka-client");
        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer producer = new KafkaProducer<>(producerConfig);
        // 给主题发送消息
        producer.send(new ProducerRecord<>(topic, "hello,"+System.currentTimeMillis()));
    }
}

2.2、使用springBoot

1、依赖

 <!-- 不使用kafka的原始客户端,使用spring集成的,这样比较方便  -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <!-- 可以不用指定,springBoot 会帮我们选择,如果有特殊需求,可以更改 -->
            <!--            <version>3.0.2</version>-->
        </dependency>

2、配置文件

server:
  port: 7280
  servlet:
    context-path: /thermal-emqx2kafka
  shutdown: graceful

spring:
  application:
    name: thermal-api-demonstration-tdengine
  lifecycle:
    timeout-per-shutdown-phase: 30s
  mvc:
    pathmatch:
      matching-strategy: ant_path_matcher  # 不然spring boot 2.6以后的版本 和 swagger 会出现 问题,可以参考 https://blog.csdn.net/qq_41027259/article/details/125747298
  kafka:
    bootstrap-servers: 127.0.0.1:9092  # 192.168.189.128:9092,92.168.189.128:9093,192.168.189.128:9094  连接的 Kafka Broker 主机名称和端口号
    #properties.key-serializer: # 用于配置客户端的附加属性,对于生产者和消费者都是通用的,。 org.apache.kafka.common.serialization.StringSerializer
    producer: # 生产者
      retries: 3 # 重试次数
      #acks: 1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
      #batch-size: 16384 # 一次最多发送数据量
      #buffer-memory: 33554432 # 生产端缓冲区大小
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer: # 消费者
      group-id: test-consumer-group #默认的消费组ID,在Kafka的/config/consumer.properties中查看和修改
      #enable-auto-commit: true # 是否自动提交offset
      #auto-commit-interval: 100 # 提交offset延时(接收到消息后多久提交offset)
      #auto-offset-reset: latest  #earliest,latest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

3、发送消息

package cn.jt.thermalemqx2kafka.kafka.controller;

import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.Map;

/**
 * @author GXM
 * @version 1.0.0
 * @Description TODO
 * @createTime 2023年08月17日
 */
@Slf4j
@RestController
@RequestMapping("/test")
public class TestController {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @GetMapping("/mock")
    public String sendKafkaMessage() {
        Map<String, Object> data = new HashMap<>(2);
        data.put("id", 1);
        data.put("name", "gkj");
        kafkaTemplate.send("first", JSON.toJSONString(data));
        return "ok";
    }
}

4、接受消息文章来源地址https://www.toymoban.com/news/detail-694563.html

package cn.jt.thermalemqx2kafka.kafka.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
 * @author GXM
 * @version 1.0.0
 * @Description TODO
 * @createTime 2023年08月17日
 */
@Slf4j
@Component
public class KafkaListener {

    @org.springframework.kafka.annotation.KafkaListener(topics = "first")
    private void handler(String content) {
        log.info("consumer received: {} ", content);
    }
}

到了这里,关于kafka 命令脚本说明以及在java中使用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spring Boot 整合kafka消费模式AckMode以及手动消费 依赖管理

    在pom.xml文件中导入依赖 需要自己配置AckMode时候的配置 kafka支持的消费模式,设置在 AbstractMessageListenerContainer.AckMode 的枚举中,下面就介绍下各个模式的区别 AckMode模式 AckMode模式 作用 MANUAL 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment

    2024年02月16日
    浏览(45)
  • kafka安装说明以及在项目中使用

    1、 本次实验,采用kafka版本为 3.4.0 2、我们首先需要了解一下,一个 Kafka 集群是由下列几种类型的节点构成的,它们充当着不同的作用: Broker 节点 :即 代理节点 ,是 Kafka 中的工作节点,充当消息队列的角色, 负责储存和处理消息 ,每个 Broker 都是一个独立的 Kafka 服务器

    2024年02月12日
    浏览(56)
  • 小程序商城免费搭建之java商城 电子商务Spring Cloud+Spring Boot+二次开发+mybatis+MQ+VR全景+b2b2c bbc

     在数字化时代,电商行业正经历着前所未有的变革。鸿鹄云商的saas云平台以其独特的架构和先进的理念,为电商行业带来了全新的商业模式和营销策略。该平台涉及多个平台端,包括平台管理、商家端、买家平台、微服务平台等,涵盖了pc端、手机端、h5/公众号、小程序、

    2024年01月19日
    浏览(63)
  • spring-boot-starter-aop及其使用场景说明

    如今,AOP(Aspect Oriented Programming)已经不是什么崭新的概念了,在经历了代码生成、动态代理、字节码增强甚至静态编译等不同时代的洗礼之后,Java 平台上的 AOP 方案基本上已经以 SpringAOP 结合 AspectJ 的方式稳固下来(虽然大家依然可以自己通过各种字节码工具偶尔“打造一

    2024年02月03日
    浏览(80)
  • Spring Boot - 结合 Redis 使用 Lua脚本

    在Spring Boot中整合Redis并使用Lua脚本: 添加Spring Boot和Redis的依赖: 首先,在Spring Boot项目的 pom.xml 文件中添加Spring Boot和Spring Data Redis的依赖: 配置Redis连接: 在 application.properties 或 application.yml 中配置Redis的连接信息,以及 redis 配置: RedisConfig.java

    2024年02月08日
    浏览(43)
  • 【消息中间件MQ系列】Spring整合kafka并设置多套kafka配置

            圣诞节的到来,程序员不会收到圣诞老人的🎁,但可以自己满足一下自己,所以,趁着有时间,就记录一下这会儿撸了些什么代码吧!!!         因为业务原因,需要在系统内新增其他的kakfa配置使用,所以今天研究的是怎么在系统内整合多套kafka配置使用。

    2024年02月01日
    浏览(96)
  • 使用Spring Boot和Kafka实现消息订阅和发送

    最近忙着搞低代码开发,好久没新建spring项目了,结果今天心血来潮准备建个springboot项目 注意Type选Maven,java选8,其他默认 点下一步后完成就新建了一个spring boot项目,配置下Maven环境,主要是settings.xml文件,里面要包含阿里云仓库,不然可能依赖下载不下来 在maven配置没问

    2024年02月11日
    浏览(41)
  • 使用Spring Boot和Kafka实现消息发送和订阅

    最近忙着搞低代码开发,好久没新建spring项目了,结果今天心血来潮准备建个springboot项目 注意Type选Maven,java选8,其他默认 点下一步后完成就新建了一个spring boot项目,配置下Maven环境,主要是settings.xml文件,里面要包含阿里云仓库,不然可能依赖下载不下来 在maven配置没问

    2024年02月11日
    浏览(42)
  • 使用 Spring Boot 整合 Kafka:实现高效的消息传递

    Kafka 是一种流处理平台,用于在分布式系统中处理高吞吐量的数据流。它是一种基于发布订阅模式的消息系统,能够处理来自多个应用程序的数据流。Kafka 具有高度的可扩展性、可靠性和性能,使得它成为处理大数据的流行选择。 Spring Boot 是一种开源框架,用于简化 Java 应用

    2024年02月14日
    浏览(48)
  • 使用Spring Boot集成中间件:Kafka的高级使用案例讲解

    在实际应用中,Kafka作为一种强大的分布式消息系统,广泛应用于实时数据处理和消息传递。本文将通过一个全面的使用案例,详细介绍如何使用Spring Boot集成Kafka,并展示其在实际场景中的应用。 在开始之前,我们需要确保已经完成以下准备工作: 安装并启动Kafka集群 创建

    2024年02月01日
    浏览(54)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包