【学习笔记】kafka学习二

这篇具有很好参考价值的文章主要介绍了【学习笔记】kafka学习二。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

生产者-同步消息发送

【学习笔记】kafka学习二

如果生产者发送消息没有收到ack,会阻塞到3s时间,如果还没收到消息,会重试,重试3次

生产者-异步消息发送(缺点:消息丢失情况,同步更优)

生产者发送消息后可以直接执行后面的业务,Broker接收到消息后异步调用生产者提供的callback回调方法

【学习笔记】kafka学习二

 生产者-ack配置prop.put(ProducerConfig.ACKS_CONFIG,"1");

-ack = 0:kafka集群不需要任何的broker收到消息,就立即返回ack给生产者,最容易丢消息,性能是最高的。

-ack = 1:多个副本之间的leader已经收到消息,并把消息写入本地的log中,才会返回ack给生产者,性能和安全性最均衡。

-ack =-1/all:里面有默认配置min.insync.replicas=1(默认为1即和ack=1相同,推荐配置大于等于2)min.insync.replicas=3即将消息写入本地的log,且三个副本(即follower和leader同步)都同步后才返回。

生产者-关于消息发送的缓冲区

【学习笔记】kafka学习二

 Kafka默认会创建一个消息缓冲区,用来存放要发送的消息,缓冲区为32m

prop.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);

Kafka本地线程会去缓冲区中一次拉16k的数据,发送到Broker

prop.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);

如果线程拉不到16k,间隔10ms也会将已拉倒的数据发送给Broker

prop.put(ProducerConfig.LINGER_MS_CONFIG,10);

消费者需要将所属的消费组+消费的主体+消费的某个分区+消费的偏移量,将信息提交到集群的_consumer_offsets主题里面

消费者-自动提交

消息poll下来以后直接提交offset

可能丢消息:假设消费者poll消息下来(3条),offset提交3,还没消费消息消费者就宕了,再启动另一条消费者去消费,offset为3,从3以后拉取消费消息,前三条消息丢失未消费

消费者-手动提交

在消费消息时/后提交offset

自动提交设为false

prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

同时消费完消息,手动将offset提交上去

【学习笔记】kafka学习二

 

手动同步提交,程序应用会进行阻塞,而手动异步提交解决了这个问题,但是异步提交没有重试机制。因为如果程序返回提交失败,消息可能会出现重复消费的情况,假设发起异步提交A,此时提交偏移量是2000,同时又发起异步提交B为3000,此时A成功B失败,会将3000回滚到2000,出现消息重复消费

消费消息poll的细节

poll拉取消息设置拉取消息条数:500条——长轮询1秒

ConsumerRecords<String, String> poll = consumer.poll(1000);


    //1、假设拉到500条消息,则直接消费消息
    //2.假设没有拉到500条,如果时间到了,也进行for循环消费消息

prop.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,500);


可以根据消费速度来设置,如果两次Poll时间超过30s时间间隔,kafka会认为该消费者消费能力弱,从而踢出消费组,将分区分配给其他消费者
prop.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,30*1000);//30秒

消费者指定分区偏移量和时间消费

consumer给broker发送心跳的时间间隔,1s一次
prop.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,1000);//1s


kafka如果超过10秒没有收到消费者的心跳,则会吧消费和踢出消费组,进行rebalance,把分区重新分配给其他消费者
prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,10 * 100);

指定主题和分区
consumer.assign(Collections.singleton(new TopicPartition("wangting",0)));

//从头开始消费,可以重复消费消息
consumer.seekToBeginning(Collections.singleton(new TopicPartition("wangting",0)));


//指定位置消费
consumer.seek(new TopicPartition("wangting",0),10);

新消费组消费offset规则

//新消费组消费规则
// earliest 从最早的开始(不记录提交点),如果消费者是新的,则从头,下次则从offset开始
//latest 从最新的开始(记录提交点)从当前分区的最后一条消息的offset+1开始消费(默认!)
//none 报错

prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

 

Kafka线上问题优化

如何防止消息丢失

发送方:ack是1 或者-1/all 可以防止消息丢失,如果要做到99.9999%,ack设成al,把min.insync.replicas配置成分区备份数。消费方:把自动提交改为手动提交。

如何防止消息的重复消费

一条消息被消费者消费多次。如果为了消息的不重复消费,而把生产端的重试机制关闭、消费端的手动提交改成自动提交,这样反而会出现消息丢失,那么可以直接在防治消息丢失的手段上再加上消费消息时的幂等性保证,就能解决消息的重复消费问题。幂等性如何保证:

  1. mysql 插入业务id作为主键,主键是唯一的,所以一次只能插入一条
  2. 使用redis或zk的分布式锁(主流的方案)

如何做到顺序消费

发送方:在发送时将ack不能设置0,关闭重试,使用同步发送,等到发送成功再发送下一条。确保消息是顺序发送的

接收方:消息是发送到一个分区中,只能有一个消费组的消费者来接收消息。

因此,kafka的顺序消费会牺牲掉性能。

解决消息积压问题

消息积压会导致很多问题,比如磁盘被打满、生产端发消息导致kafka性能过慢,就容易出现服务雪崩,就需要有相应的手段:

方案一: 在一个消费者中启动多个线程,让多个线程同时消费。一一提升一个消费者的消费能力。

方案二: 如果方案一还不够的话,这个时候可以启动多个消费者,多个消费者部署在不同的服务器上。其实多个消费者部署在同一服务器上也可以提高消费能力一一充分利用服务器的cpu资源。

方案三: 让一个消费者去把收到的消息往另外一个topic上发,另一个topic设置多个分区和多个消费者,进行具体的业务消费。

延迟队列

延迟队列的应用场景:在订单创建成功后如果超过30分钟没有付款,则需要取消订单,此时可用延时队列来实现。

  1. 创建多个topic,每个topic表示延时的间隔
  1. topic_5s:延时5s执行的队列
  2. topic_1m: 延时1分钟执行的队列
  3. topic_30m:延时30分钟执行的队列
  1. 消息发送者发送消息到相应的topic,并带上消息的发送时间
  2. 消费者订阅相应的topic,消费时轮询消费整个topic中的消息
  1. 如果消息的发送时间,和消费的当前时间超过预设的值,比如30分钟
  2. 如果消息的发送时间,和消费的当前时间没有超过预设的值,则不消费当前的offset及之后的offset的所有消息都消费
  3. 下次继续消费该offset处的消息,判断时间是否已满足预设值

JAVA实现生产者消费者

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.0.0</version>
</dependency>
import net.sf.jsqlparser.statement.select.Top;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Test;

import javax.xml.crypto.Data;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class KafkaTest {
    @Test
    public void consumer(){
        Properties prop = new Properties();
        //设置集群地址
        prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.235.137:9092");
        //设置key序列化器
        prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        //设置值的序列化器
        prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        //earliest 从最早的开始(不记录提交点)
        //latest 从最新的开始(记录提交点)
        //none 报错
        prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        //模拟多消费者在同一个消费者分组里G2
        prop.put(ConsumerConfig.GROUP_ID_CONFIG, "G2");

        //是否自动提交offset,默认是true
        prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
        //自动提交offset时间间隔
        prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");


        for (int i = 0; i < 3; i++) {
            new Thread(() -> {
                KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
                System.out.println(2);
                consumer.subscribe(Collections.singleton("wangting"));
                while (true){
                    ConsumerRecords<String, String> poll = consumer.poll(100);
                    for (ConsumerRecord<String,String> record: poll) {
                        System.out.println(Thread.currentThread().getName()+"\t"+record.offset() + "\t" + record.key() + "\t" + record.value());
                    }
                }
            }).start();
        }
    }

    @Test
    public void oneConsumer(){
        Properties prop = new Properties();
        prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.235.137:9092");
        prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        //新消费组消费规则
        // earliest 从最早的开始(不记录提交点),如果消费者是新的,则从头,下次则从offset开始
        //latest 从最新的开始(记录提交点)从当前分区的最后一条消息的offset+1开始消费(默认!)
        //none 报错
        prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        //分组
        prop.put(ConsumerConfig.GROUP_ID_CONFIG, "G1");
        //consumer给broker发送心跳的时间间隔
        prop.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,1000);
        //kafka如果超过10秒没有收到消费者的心跳,则会吧消费和踢出消费组,进行rebalance,把分区重新分配给其他消费者
        prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,10 * 100);
        //poll拉取消息设置拉取消息条数:500条——长轮询1秒
            //1、假设拉到500条消息,则直接消费消息
            //2.假设没有拉到500条,如果时间到了,也进行for循环消费消息
        prop.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,500);
        //可以根据消费速度来设置,如果两次Poll时间间隔超过30s,kafka会认为该消费者消费能力弱,从而踢出消费组,将分区分配给其他消费者
        prop.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,30*1000);



        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
        //消费者订阅主题
//        consumer.subscribe(Arrays.asList("wangting"));或
        consumer.subscribe(Collections.singleton("wangting"));
        //指定主题和分区
        consumer.assign(Collections.singleton(new TopicPartition("wangting",0)));
        //从头开始消费,可以重复消费消息
        consumer.seekToBeginning(Collections.singleton(new TopicPartition("wangting",0)));
        //指定位置消费
        consumer.seek(new TopicPartition("wangting",0),10);


        //从指定时间点开始消费
//        List<PartitionInfo> partitionInfos = consumer.partitionsFor("wangting");
//        //从一小时前开始消费
//        long fetchDateTime = new Date().getTime() - 1000 * 60 * 60;
//        Map<TopicPartition,Long> map = new HashMap<>();
//        for(PartitionInfo part : partitionInfos){
//            map.put(new TopicPartition("wangting",part.partition()),fetchDateTime);
//        }
//        Map<TopicPartition,OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map);
//        for(Map.Entry<TopicPartition,OffsetAndTimestamp> entry : parMap.entrySet()){
//            TopicPartition key = entry.getKey();
//            OffsetAndTimestamp value = entry.getValue();
//            if(key == null || value == null) continue;
//            Long offset = value.offset();
//            //根据消费者的timestamp确定offset
//            if(value != null){
//                consumer.assign(Arrays.asList(key));
//                consumer.seek(key ,offset);
//            }
//        }




        //一个消费者组G1里只有一个消费者
        while (true){
            //如果1s内每1s内没有poll到任何消息,则继续poll消息,知道Poll到消息。如果超过1s长轮询结束
            ConsumerRecords<String, String> poll = consumer.poll(1000);
            for (ConsumerRecord<String,String> record: poll) {
                System.out.println(record.offset() + "\t" + record.key() + "\t" + record.value());

            }
            //消息到这已经全部消费完
            if(poll.count() > 0){
                //有消息,手动同步提交offset,当前线程会阻塞,直到offset提交成功
                consumer.commitSync();

                //手动异步提交,当前线程提交offset不会阻塞,可以继续处理后面的业务
//                consumer.commitAsync(new OffsetCommitCallback() {
//                    @Override
//                    public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
//                        if(e != null){
//                            System.out.println("给"+map+"提交失败"+e.getStackTrace());
//                        }
//                    }
//                });
            }
        }
    }

    @Test
    public void oneProducer() throws InterruptedException {
        Properties prop = new Properties();
        prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.235.137:9092");
        prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        //重试次数-10次
        prop.put(ProducerConfig.RETRIES_CONFIG,10);

        //重试间隔设置,300ms
        prop.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG,300);

        //设置缓冲区大小
        prop.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);

        //kafka本地线程从缓冲区取数据,批量发送给Broker,默认值16kb,即16384,batch满16k就发送出去
        prop.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
        prop.put(ProducerConfig.LINGER_MS_CONFIG,10);

        //ack配置
        //prop.put(ProducerConfig.ACKS_CONFIG,"1");
        KafkaProducer<String, String> producer = new KafkaProducer<>(prop);

        //要发五条消息
        int msgNum = 5;
        final CountDownLatch downLatch = new CountDownLatch(msgNum);
        for(int i = 0;i < 5;i++){
            SimpleDateFormat format = new SimpleDateFormat();
            Date date = new Date(System.currentTimeMillis());
            ProducerRecord<String,String> record = new ProducerRecord<>("wangting",format.format(date));

            //异步,发五条就要五个反馈
            producer.send(record,new Callback(){
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if(e != null){
                        System.out.println("发送消息失败");
                    }
                    if(recordMetadata != null){
                        System.out.println("异步发送消息"+recordMetadata.topic()+recordMetadata.partition()+recordMetadata.offset());
                    }
                    downLatch.countDown();//减一
                }
            });

            //同步
            //        try{
//            producer.send(record);
//
//        }catch (Exception e){
//            e.printStackTrace();
//        }
        }

        Thread.sleep(1000000);
        downLatch.await(5, TimeUnit.SECONDS);//如果不是0继续等5秒
        producer.close();
    }
}

SpringBoot整合Kafka

<dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
</dependency>
spring:
  kafka:
    bootstrap-servers: 192.168.235.137:9092,192.168.235.137:9093,192.168.235.137:9094
    producer:
      # 重试次数,默认Integer.MAX_VALUE
      retries: 1
      # 同一批次内存大小(默认16K)
      batch-size: 16384
      # 生产者内存缓存区大小(32M)
      buffer-memory: 33554432
      # key和value的序列化(默认,可以不设置)
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # ack应答机制,默认1,即只需要确认leader收到消息
      acks: 1
        # springboot1.5.16自动装配中不支持properties下的其他配置,不知道为啥。2.x版本可以
        #properties:
        # 使用自定义的分区选择器
      #{partitioner.class: com.msy.kafka.MyPartition, acks: all}
    consumer:
      group-id: test
      enable-auto-commit: false
      # earliest:从头开始消费   latest:从最新的开始消费   默认latest
      auto-offset-reset: latest
      # key和value反序列化(默认,可以不设置)
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      # 消费者并发能力
      concurrency: 6
      # 设置手动提交的时候,需要设置ackMode
#      RECORD :当listener一读到消息,就提交offset
#      BATCH: poll() 函数读取到的所有消息,就提交offset
#      TIME: 当超过设置的ackTime ,即提交Offset
#      COUNT :当超过设置的COUNT,即提交Offset
#      COUNT_TIME :TIME和COUNT两个条件都满足,提交offset
#      MANUAL : 当每批poll的消息全部处理完,Acknowledgment.acknowledge()即提交Offset,和Batch类似
#      MANUAL_IMMEDIATE: 只要调用Acknowledgment.acknowledge()即提交Offset
      ack-mode: MANUAL
    topic: wangting
@RestController
public class MyKafkaController {
    @Resource
    private KafkaTemplate<String,String> kafkaTemplate;

    @RequestMapping("/send")
    public String senMessage(){
        kafkaTemplate.send("wangtng",0,"key","this is a message");
        return "send success;";
    }

}

 文章来源地址https://www.toymoban.com/news/detail-478057.html

@Component
public class MyConsumer {
    //消费xx主题,为x消费组
//    @KafkaListener(topics = "wangting",groupId = "test")
//    public void listenGroup(ConsumerRecord<String,String> record, Acknowledgment ack){
//        //对每条消息处理
//        String value = record.value();
//        System.out.println(value);
//        System.out.println(record);
//        //手动提交时如果不调用这个方法,消息会重复消费
//        ack.acknowledge();a
//    }

    @KafkaListener(groupId = "test",topicPartitions = {
            @TopicPartition(topic = "topic1",partitions = {"0","1"}),
            @TopicPartition(topic = "topic2",partitions = "0",partitionOffsets = @PartitionOffset(partition = "1",initialOffset = "100"))},
            //在消费topic2主题时,消费0分区不指定,消费1号分区从offset100位置消费
    concurrency = "3")//在这个组下,kafka创建3个消费者去消费,并发消费,建议小于等于分区总数
    public void listenGroup(ConsumerRecord<String,String> record, Acknowledgment ack){
        //对每条消息处理
        String value = record.value();
        System.out.println(value);
        System.out.println(record);
        //手动提交时如果不调用这个方法,消息会重复消费
        ack.acknowledge();
    }
}

到了这里,关于【学习笔记】kafka学习二的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • kafka生产者异步发送、同步发送、回调异步发送,是什么情况?

    Kafka是一种分布式流处理平台 ,它是一种高吞吐量、可扩展、可持久化的消息队列系统,用于处理和存储实时流式数据。 Kafka基于发布-订阅模式,采用了分布式、多副本、分区的架构。它允许生产者将数据以消息的形式发送到Kafka集群的一个或多个主题(topic)中,而消费者可以

    2024年02月15日
    浏览(39)
  • 分布式 - 消息队列Kafka:Kafka生产者发送消息的方式

    不管是把Kafka作为消息队列、消息总线还是数据存储平台,总是需要一个可以往Kafka写入数据的生产者、一个可以从Kafka读取数据的消费者,或者一个兼具两种角色的应用程序。 Kafka 生产者是指使用 Apache Kafka 消息系统的应用程序,它们负责将消息发送到 Kafka 集群中的一个或多

    2024年02月13日
    浏览(44)
  • 分布式 - 消息队列Kafka:Kafka生产者发送消息的分区策略

    01. Kafka 分区的作用 分区的作用就是提供负载均衡的能力,或者说对数据进行分区的主要原因,就是为了实现系统的高伸缩性。不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的

    2024年02月13日
    浏览(54)
  • 分布式 - 消息队列Kafka:Kafka生产者发送消息的3种方式

    不管是把Kafka作为消息队列、消息总线还是数据存储平台,总是需要一个可以往Kafka写入数据的生产者、一个可以从Kafka读取数据的消费者,或者一个兼具两种角色的应用程序。 Kafka 生产者是指使用 Apache Kafka 消息系统的应用程序,它们负责将消息发送到 Kafka 集群中的一个或多

    2024年02月13日
    浏览(48)
  • kafka服务端允许生产者发送最大消息体大小

            server.properties中加上的message.max.bytes配置,我目前设置为5242880,即5MB,可以根据实际情况增大。         在生产者端配置max.request.size,这是单个消息最大字节数,根据实际调整,max.request.size 必须小于 message.max.bytes 以及消费者的 max.partition.fetch.bytes。这样消息

    2024年02月15日
    浏览(46)
  • Kafka 入门到起飞系列 - 生产者发送消息流程解析

    生产者通过 producerRecord 对象封装消息主题、消息的value(内容)、timestamp(时间戳)等 生产者通过 send() 方法发送消息,send()方法会经过如下几步 1. 首先将消息交给 拦截器(Interceptor) 处理, 拦截器对生产者而言,对所有消息都是生效的,拦截器也支持链式编程(责任器链)的

    2024年02月16日
    浏览(48)
  • 07、Kafka ------ 消息生产者(演示 发送消息) 和 消息消费者(演示 监听消息)

    简单来说,就是一个数据项。 ▲ 消息就是 Kafka 所记录的数据节点,消息在 Kafka 中又被称为记录(record)或事件(event)。 从存储上来看,消息就是存储在分区文件(有点类似于List)中的一个数据项,消息具有 key、value、时间戳 和 可选的元数据头。 ▲ 下面是一个示例事件

    2024年01月20日
    浏览(46)
  • 【注意】Kafka生产者异步发送消息仍有可能阻塞

    Kafka是常用的消息中间件。在Spring Boot项目中,使用KafkaTemplate作为生产者发送消息。有时,为了不影响主业务流程,会采用 异步 发送的方式,如下所示。 本以为采用异步发送,必然不会影响到主业务流程。但实际使用时发现,在第一次发送消息时,如果Kafka Broker连接失败,

    2023年04月13日
    浏览(78)
  • Kafka中的生产者如何处理消息发送失败的情况?

    在Kafka中,生产者可以通过以下方式处理消息发送失败的情况: 同步发送模式(Sync Mode):在同步发送模式下,生产者发送消息后会阻塞等待服务器的响应。如果发送失败,生产者会抛出异常(例如 ProducerRecord 发送异常)或返回错误信息。开发者可以捕获异常并根据需要进行

    2024年02月06日
    浏览(44)
  • kafka生产者发送消息报错 Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected

    报这个错误是因为kafka里的配置要修改下 在config目录下 server.properties配置文件 这下发送消息就不会一直等待,就可以发送成功了

    2024年02月06日
    浏览(46)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包