kafka常见命令介绍和使用

这篇具有很好参考价值的文章主要介绍了kafka常见命令介绍和使用。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

首先通过这个命令什么也不加参数可以看到参数的详解

./kafka-topics.sh

创建一个topic基本参数
连接kafka : --zookeeper
操作一个topic : --topic
对一个topic进行什么样的操作?增–create删–delete改–alter查–describe
指定分区数:–partitions
指定副本个数:–replication-factor
1、创建一个test0主题并指定分区数1副本数1

./kafka-topics.sh  --zookeeper 192.168.124.8:2181 --topic test0 --create --replication-factor 1 --partitions 1

2、查看都有哪些主题

./kafka-topics.sh --zookeeper 192.168.124.8:2181 --list

3、查看主题test0的详细信息

./kafka-topics.sh --zookeeper 192.168.124.8:2181 --topic test0 --describe

4、修改分区为3 分区数只能增加不能减少!

./kafka-topics.sh --zookeeper 192.168.124.8:2181 --topic test0 --alter --partitions 3

5、另外这里不能通过命令行的方式去修改副本

./kafka-topics.sh --zookeeper 192.168.124.8:2181 --topic test0 --alter --replication-factor 3

6、发送消息到topic

./kafka-console-producer.sh --broker-list 192.168.124.8:9092 --topic test0

7、消费者查看消息

# 增量消费数据,以前发送的不能读取到
./kafka-console-consumer.sh --bootstrap-server 192.168.124.8:9092 --topic message 
# --from-beginning 读取历史消息
./kafka-console-consumer.sh --bootstrap-server 192.168.124.8:9092 --topic message --from-beginning

主题创建

./kafka-topics.sh  --zookeeper 192.168.124.8:2181 --topic message --create --replication-factor 1 --partitions 1 

生产者

kafka生产者发送消息

添加依赖

<dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
</dependency>
// 简单发送数据
    @Test
    void SimpleSendData(){
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.124.8:9092");
        // 指定key和value的序列化类型
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // create producer 我们写入 hello 的时候 没有key 实际key="" value="hello" 所以都是String 对应下面的K, V
        KafkaProducer<String,String> kafkaProducer =new KafkaProducer<String, String>(properties);

        //简单消息发送
        kafkaProducer.send(new ProducerRecord<>("message", "hello world "));
        // close
        kafkaProducer.close();
    }

进入容器消费者查看消息是否发送成功

docker exec -it kafka /bin/bash
cd /opt/kafka_2.13-2.8.1/bin
# 消费者 消费消息
kafka-console-consumer.sh --bootstrap-server 192.168.124.8:9092 --topic message --from-beginning

发现消息正常消费。

带有回调函数发送消息

	@Test
    void testProducerCallback(){
        Properties properties = new Properties();

        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.124.8:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // create producer
        KafkaProducer<String,String> kafkaProducer =new KafkaProducer<String, String>(properties);

        // 也可以定义一个类实现Callback接口
        kafkaProducer.send(new ProducerRecord<>("message", "hello world  exec callback"), new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if(exception==null){// 没有异常发送成功
                    System.out.println("topic :" +metadata.topic());
                    System.out.println("分区partition :" +metadata.partition());
                    /*
                        topic :message
                        分区partition :0
                     */
                }else {
                    // 打印异常信息
                    exception.printStackTrace();
                }
            }
        });

        // close
        kafkaProducer.close();
    }

lombda简化写法

@Test
    void testProducerCallbacklombda(){
        Properties properties = new Properties();

        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.124.8:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // create producer
        KafkaProducer<String,String> kafkaProducer =new KafkaProducer<String, String>(properties);

        // 也可以定义一个类实现Callback接口
        kafkaProducer.send(new ProducerRecord<>("message", "hello world  exec callback2"), ((metadata, exception) -> {
            if(exception==null){// 没有异常发送成功
                System.out.println("topic :" +metadata.topic());
                System.out.println("分区partition :" +metadata.partition());
                    /*
                        topic :message
                        分区partition :0
                     */
            }else {
                // 打印异常信息
                exception.printStackTrace();
            }
        }));

        // close
        kafkaProducer.close();
    }

上述都是异步发送消息

同步发送 sync

调用 send() 方法,然后再调用 get() 方法等待 Kafka 响应。如果服务器返回错误,get() 方法会抛出异常,
如果没有发生错误,我们会得到 RecordMetadata 对象,可以用它来查看消息记录。
指定分区发送

    @Test
    void userPortitionsSend(){
        Properties properties = new Properties();

        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.124.8:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // create producer
        KafkaProducer<String,String> kafkaProducer =new KafkaProducer<String, String>(properties);
        /*
         默认的分区规则  DefaultPartitioner
         指定发送到哪个分区 0 后面有个key 空即可
        */
        kafkaProducer.send(new ProducerRecord<>("message", 2,"","hello world  exec callback3"),((metadata, exception) -> {
            if(exception==null){// 没有异常发送成功
                System.out.println("topic :" +metadata.topic());
                System.out.println("分区partition :" +metadata.partition());
                    /*
                        topic :message
                        分区partition :2
                     */
            }else {
                // 打印异常信息
                exception.printStackTrace();
            }
        }));
        kafkaProducer.close();
    }

指定key 按照key的哈希值 对分区取模 映射

        kafkaProducer.send(new ProducerRecord<>("message", "a","hello world  exec callback"), new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if(exception==null){// 没有异常发送成功
                    System.out.println("topic :" +metadata.topic());
                    System.out.println("分区partition :" +metadata.partition());
                    /*
                        topic :message
                        分区partition :0
                     */
                }
            }
        });

希望把订单表里的所有数据发送到 kafka 的某一个分区 ? 实现 只需在key上放上订单的表名字 —一定会发到一个分区上

自定义分区器

1、需求:实现一个分区器实现,发送过来的数据中如果包含zero就发送0号分区,不包含zero就发往1号分区。
2、定义类实现Partitioner接口
MyPartitioner.java

public class MyPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // get data
        String msgValue = value.toString();
        int partition;
        if(msgValue.contains("zero")){
            partition=0;
        }else {
            partition=1;
        }
        return partition;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}
    @Test
    void customPartitionSend(){
        Properties properties = new Properties();

        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.124.8:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
		//自定义分区器
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,MyPartitioner.class);

        // create producer
        KafkaProducer<String,String> kafkaProducer =new KafkaProducer<String, String>(properties);
        kafkaProducer.send(new ProducerRecord<>("message", "hello world  exec callback"),((metadata, exception) -> {
            if(exception==null){// 没有异常发送成功
                System.out.println("topic :" +metadata.topic());
                System.out.println("分区partition :" +metadata.partition());
                    /*
                        topic :message
                        分区partition :2
                     */
            }else {
                // 打印异常信息
                exception.printStackTrace();
            }
        }));

        // close
        kafkaProducer.close();
    }

上述方式实现了自定义分区器。

提高生产者吞吐量

    @Test
    void testproducer(){
        Properties properties = new Properties();

        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092,localhost:9093");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        //缓冲区大小
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,64*1024*1024);// 64M 缓冲区大小

        //批次大小  batch.size  linger.ms 批次设置32k 延迟设置 5ms  两个合理设置  等5ms 处理
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG,32*1024*1024);// 批次大小 32K
        //linger.ms
        properties.put(ProducerConfig.LINGER_MS_CONFIG,5);// 5ms
        //压缩
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");//压缩类型 snappy

        // create producer
        KafkaProducer<String,String> kafkaProducer =new KafkaProducer<String, String>(properties);

        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("message","hello world "+i));
        }

        // close
        kafkaProducer.close();
    }

数据可靠性

acks=0,生产者发送过来数据就不管了,Leader一旦崩掉了,也没有办法。可靠性差,效率高;
acks=1,生产者发送过来数据Leader应答,如果应答完,Leader还没同步给Follower副本就挂了,此时新的leader就会产生,新的Leader就没有办法收到原数据(因为生产者已经认为发送成功了)。可靠性中等,效率中等;
-1(all):生产者发送过来的数据,Leader+isr队列里面的所有收齐数据后应答。-1和all等价

	@Test
    void testproducer(){
        Properties properties = new Properties();

        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092,localhost:9093");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        //缓冲区大小
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,64*1024*1024);// 64M 缓冲区大小

        //批次大小  batch.size  linger.ms 批次设置32k 延迟设置 5ms  两个合理设置  等5ms 处理
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG,32*1024*1024);// 批次大小 32K
        //linger.ms
        properties.put(ProducerConfig.LINGER_MS_CONFIG,5);// 5ms
        //压缩
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");//压缩类型 snappy

        //----
        properties.put(ProducerConfig.ACKS_CONFIG,"1");// acks 数据可靠性 default all
        properties.put(ProducerConfig.RETRIES_CONFIG,3);// 重试次数  default max(int)
        //---
        // create producer
        KafkaProducer<String,String> kafkaProducer =new KafkaProducer<String, String>(properties);

        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("message","hello world "+i));
        }

        // close
        kafkaProducer.close();
    }

幂等性

生产者不论向Broker发送多少次重复数据,Broker端都只会持久化一次,保证了不重复。(幂等性默认开启,只保证单分区单会话内不重复,kafka挂掉再重启还是会产生重复数据)
生产者事务
开启事务必须开启幂等性。(!必须指定事务的id,ack=all)第五条消息发送失败,终止了。

    @Test
    void test(){
        Properties properties = new Properties();

        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092,localhost:9093");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        //缓冲区大小
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,64*1024*1024);// 64M 缓冲区大小

        //批次大小  batch.size  linger.ms 批次设置32k 延迟设置 5ms  两个合理设置  等5ms 处理
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG,32*1024*1024);// 批次大小 32K
        //linger.ms
        properties.put(ProducerConfig.LINGER_MS_CONFIG,5);// 5ms
        //压缩
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");//压缩类型 snappy

        //----
        properties.put(ProducerConfig.ACKS_CONFIG,"all");// acks 数据可靠性 default all
        properties.put(ProducerConfig.RETRIES_CONFIG,3);// 重试次数  default max(int)
        //---
        // 必须指定事务id 否则失败 事务id任意取 只要保证全局唯一即可
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"tranactional_id_01");

        // create producer
        KafkaProducer<String,String> kafkaProducer =new KafkaProducer<String, String>(properties);
        // 初始化 即初始化事务
        kafkaProducer.initTransactions();
        // 开启事务
        kafkaProducer.beginTransaction();
        try {
            for (int i = 0; i < 5; i++) {
                kafkaProducer.send(new ProducerRecord<>("message","hello world "+i));
                if(i==4){
                    int j=1/0;
                }
            }
            kafkaProducer.commitTransaction();
        } catch (ProducerFencedException e) {
            kafkaProducer.abortTransaction();
        }finally {
            // close
            kafkaProducer.close();
        }
    }

消费者

一个消费者去消费某个主题的数据

docker exec -it kafka /bin/bash
cd /opt/kafka_2.13-2.8.1/bin
# 生产者 生产消息
./kafka-console-producer.sh --broker-list 192.168.124.8:9092 --topic message

生产消息。

    public static void main(String[] args) {
        Properties properties=new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        //!!!! 必须配置组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"message");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);

        consumer.subscribe(Arrays.asList("message"));

        while (true){
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));// 拉的动作 1s 拉一次
            consumerRecords.forEach(data->{
                System.out.println(data);
            });
        }
    }

消费者消费一个分区

使用生产者对某个分区生产数据

    @Test
    void userPortitionsSend(){
        Properties properties = new Properties();

        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.124.8:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // create producer
        KafkaProducer<String,String> kafkaProducer =new KafkaProducer<String, String>(properties);
        /*
         默认的分区规则  DefaultPartitioner
         指定发送到哪个分区 0 后面有个key 空即可
        */
        kafkaProducer.send(new ProducerRecord<>("message", 2,"","hello world  exec callback3"),((metadata, exception) -> {
            if(exception==null){// 没有异常发送成功
                System.out.println("topic :" +metadata.topic());
                System.out.println("分区partition :" +metadata.partition());
                    /*
                        topic :message
                        分区partition :2
                     */
            }else {
                // 打印异常信息
                exception.printStackTrace();
            }
        }));
        kafkaProducer.close();
    }

针对特定分区进行消费

    @Test
    void consumerOnePartition(){
        Properties properties=new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        //!!!! 必须配置组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"message");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);

        ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
        topicPartitions.add(new TopicPartition("message",2));
        // 订阅主题对应的分区
        consumer.assign(topicPartitions);

        while (true){
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));// 拉的动作 1s 拉一次
            consumerRecords.forEach(data->{
                System.out.println(data);
            });
        }
    }

offset

kafka默认自动提交offest 默认5s提交一次。
手动提交offest
1、同步提交(commitSync)必须等待offest提交完毕,再去消费下一批数据
2、异步提交(commitAsync)发送完提交offest请求后,就开始消费下一批数据了。
手动提交文章来源地址https://www.toymoban.com/news/detail-767080.html

    @Test
    void commitCustom(){
        Properties properties=new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        //!!!! 必须配置组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"message");
        // 手动提交
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);

        ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
        topicPartitions.add(new TopicPartition("message",2));
        // 订阅主题对应的分区
        consumer.assign(topicPartitions);

        while (true){
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));// 拉的动作 1s 拉一次
            consumerRecords.forEach(data->{
                System.out.println(data);
            });
            // 手动提交 同步提交
            consumer.commitSync();
             // 异步提交
            //consumer.commitAsync();
        }
    }

到了这里,关于kafka常见命令介绍和使用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【项目实战】Kafka中Topic创建介绍

    Kafka是一种分布式流处理平台,它可以处理实时数据流,支持高吞吐量、低延迟的数据处理。 它通过Topic和Partition机制将消息存储在集群中,并支持高吞吐量的消息发布和订阅。 Topic可以看作是一个消息队列 生产者将消息发送到Topic中,消费者从Topic中消费消息。 生产者将消

    2024年02月09日
    浏览(53)
  • Kafka:Topic概念与API介绍

    事件被组织并持久地存储在 Topic 中, Topic 类似于文件系统中的文件夹,事件就是该文件夹中的文件。 Kafka 中的 Topic 始终是多生产者和多订阅者:一个 Topic 可以有零个、一个或多个生产者向其写入事件,也可以有零个、一个或多个消费者订阅这些事件。 Topic 中的事件可以根

    2024年02月05日
    浏览(96)
  • Kafka - 获取 Topic 生产者发布数据命令

    从头开始获取 20 条数据(等价于时间升序) ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic your-topic --from-beginning --max-messages 20 获取最新 20 条数据(等价于时间降序)去掉 --from-beginning 即可(默认) ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic your-topic --max-me

    2024年02月14日
    浏览(39)
  • kafka2.x常用命令:创建topic,查看topic列表、分区、副本详情,删除topic,测试topic发送与消费

    原创/朱季谦 接触kafka开发已经两年多,也看过关于kafka的一些书,但一直没有怎么对它做总结,借着最近正好在看《Apache Kafka实战》一书,同时自己又搭建了三台kafka服务器,正好可以做一些总结记录。 本文主要是记录如何在kafka集群服务器上创建topic,查看topic列表、分区、

    2024年02月03日
    浏览(48)
  • (五)kafka从入门到精通之topic介绍

    Kafka是一个流行的分布式消息系统,它的核心是一个由多个节点组成的分布式集群。在Kafka中,数据被分割成多个小块,并通过一些复杂的算法在节点之间传递。这些小块被称为Kafka Topic。 一个Topic是一组具有相同主题的消息。可以将Topic看作是一个数据仓库,在这个仓库中存

    2024年02月12日
    浏览(36)
  • kafka通过命令行删除指定topic下所有records

     1、kafka环境配置  由于在windows环境下,在kafka官网下载下来Apache Kafka需要将E:kafka_2.12-3.3.1binwindows下的路径加入到环境变量中,方便直接使用kafka工具,其他系统直接使用bin下的工具即可: 2、配置kafka指定topic的json文件,命名为delete.json,此文件放在任意位置都可: 可以查

    2024年02月14日
    浏览(41)
  • kafka配置大全broker、topic、生产者和消费者等配置介绍

    每个kafka broker中配置文件 server.properties 默认必须配置的属性如下: **bootstrap.servers** - 指定生产者客户端连接kafka集群所需的broker地址列表,格式为host1:port1,host2:port2,可以设置一个或多个。这里并非需要所有的broker地址,因为生产者会从给定的broker里寻找其它的broker。 **key

    2024年02月05日
    浏览(48)
  • kafka 基础概念、命令行操作(查看所有topic、创建topic、删除topic、查看某个Topic的详情、修改分区数、发送消息、消费消息、 查看消费者组 、更新消费者的偏移位置)

    kafka官网 Broker   一台kafka服务器就是一个broker,可容纳多个topic。一个集群由多个broker组成; Producer   生产者,即向kafka的broker-list发送消息的客户端; Consumer   消费者,即向kafka的broker-list订阅消息的客户端; Consumer Group   消费者组是 逻辑上的一个订阅者 ,由多个

    2024年02月01日
    浏览(61)
  • kafka消费多个topic的使用

    我们在业务中难免遇到一个kafka消费多个topic的消息,本文帮助大家如何在业务中用一个类消费多个topic消息 配置类1

    2024年02月11日
    浏览(64)
  • 使用kafka-clients的Java API操作Kafka集群的Topic

    记录 :464 场景 :在Spring Boot微服务集成Kafka客户端kafka-clients-3.0.0操作Kafka集群的Topic的创建和删除。 版本 :JDK 1.8,Spring Boot 2.6.3,kafka_2.12-2.8.0,kafka-clients-3.0.0。 Kafka集群安装 :https://blog.csdn.net/zhangbeizhen18/article/details/131156084 1.微服务中 配置Kafka信息 1.1在pom.xml添加依赖 pom.xml文

    2024年02月09日
    浏览(45)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包