Kafka收发消息核心参数详解

这篇具有很好参考价值的文章主要介绍了Kafka收发消息核心参数详解。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1、从基础的客户端说起

Kafka提供了非常简单的客户端API。只需要引入一个Maven依赖即可:

  <dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.13</artifactId>
  <version>3.4.0</version>
 </dependency>

1.1、消息发送者主流程

​ 然后可以使用Kafka提供的Producer类,快速发送消息。

public class MyProducer {
   private static final String BOOTSTRAP_SERVERS = "worker1:9092,worker2:9092,worker3:9092";
   private static final String TOPIC = "disTopic";

   public static void main(String[] args) throws ExecutionException, InterruptedException {
       //PART1:设置发送者相关属性
       Properties props = new Properties();
       // 此处配置的是kafka的端口
       props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
       // 配置key的序列化类
       props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
       // 配置value的序列化类
       props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

       Producer<String,String> producer = new KafkaProducer<>(props);
       CountDownLatch latch = new CountDownLatch(5);
       for(int i = 0; i < 5; i++) {
           //Part2:构建消息
           ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, Integer.toString(i), "MyProducer" + i);
           //Part3:发送消息
           //单向发送:不关心服务端的应答。
           producer.send(record);
           System.out.println("message "+i+" sended");
           //同步发送:获取服务端应答消息前,会阻塞当前线程。
           RecordMetadata recordMetadata = producer.send(record).get();
           String topic = recordMetadata.topic();
           int partition = recordMetadata.partition();
           long offset = recordMetadata.offset();
           String message = recordMetadata.toString();
           System.out.println("message:["+ message+"] sended with topic:"+topic+"; partition:"+partition+ ";offset:"+offset);
           //异步发送:消息发送后不阻塞,服务端有应答后会触发回调函数
           producer.send(record, new Callback() {
               @Override
               public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                   if(null != e){
                       System.out.println("消息发送失败,"+e.getMessage());
                       e.printStackTrace();
                   }else{
                       String topic = recordMetadata.topic();
                       long offset = recordMetadata.offset();
                       String message = recordMetadata.toString();
                       System.out.println("message:["+ message+"] sended with topic:"+topic+";offset:"+offset);
                   }
                   latch.countDown();
               }
           });
       }
       //消息处理完才停止发送者。
       latch.await();
       producer.close();
   }
}

​ 整体来说,构建Producer分为三个步骤:

  • 设置Producer核心属性 :Producer可选的属性都可以由ProducerConfig类管理。比如ProducerConfig.BOOTSTRAP_SERVERS_CONFIG属性,显然就是指发送者要将消息发到哪个Kafka集群上。这是每个Producer必选的属性。在ProducerConfig中,对于大部分比较重要的属性,都配置了对应的DOC属性进行描述。
  • 构建消息:Kafka的消息是一个Key-Value结构的消息。其中,key和value都可以是任意对象类型。其中,key主要是用来进行Partition分区的,业务上更关心的是value。
  • 使用Producer发送消息:通常用到的就是单向发送、同步发送和异步发送者三种发送方式。

1.2、消息消费者主流程

​ 接下来可以使用Kafka提供的Consumer类,快速消费消息。

public class MyConsumer {
   private static final String BOOTSTRAP_SERVERS = "worker1:9092,worker2:9092,worker3:9092";
   private static final String TOPIC = "disTopic";

   public static void main(String[] args) {
       //PART1:设置发送者相关属性
       Properties props = new Properties();
       //kafka地址
       props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
       //每个消费者要指定一个group
       props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
       //key序列化类
       props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
       //value序列化类
       props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
       Consumer<String, String> consumer = new KafkaConsumer<>(props);
       consumer.subscribe(Arrays.asList(TOPIC));
       while (true) {
           //PART2:拉取消息
           // 100毫秒超时时间
           ConsumerRecords<String, String> records = consumer.poll(Duration.ofNanos(100));
           //PART3:处理消息
           for (ConsumerRecord<String, String> record : records) {
               System.out.println("offset = " + record.offset() + ";key = " + record.key() + "; value= " + record.value());
           }
           //提交offset,消息就不会重复推送。
           consumer.commitSync(); //同步提交,表示必须等到offset提交完毕,再去消费下一批数据。
//            consumer.commitAsync(); //异步提交,表示发送完提交offset请求后,就开始消费下一批数据了。不用等到Broker的确认。
       }
   }
}

​ 整体来说,Consumer同样是分为三个步骤:

  • 设置Consumer核心属性 :可选的属性都可以由ConsumerConfig类管理。在这个类中,同样对于大部分比较重要的属性,都配置了对应的DOC属性进行描述。同样BOOTSTRAP_SERVERS_CONFIG是必须设置的属性。
  • 拉取消息:Kafka采用Consumer主动拉取消息的Pull模式。consumer主动从Broker上拉取一批感兴趣的消息。
  • 处理消息,提交位点:消费者将消息拉取完成后,就可以交由业务自行处理对应的这一批消息了。只是消费者需要向Broker提交偏移量offset。如果不提交Offset,Broker会认为消费者端消息处理失败了,还会重复进行推送。
    ​ Kafka的客户端基本就是固定的按照这三个大的步骤运行。在具体使用过程中,最大的变数基本上就是给生产者和消费者的设定合适的属性。这些属性极大的影响了客户端程序的执行方式。

2、从客户端属性来梳理客户端工作机制

​ 渔与鱼:Kafka的客户端API的重要目的就是想要简化客户端的使用方式,所以对于API的使用,尽量熟练就可以了。对于其他重要的属性,都可以通过源码中的描述去学习,并且可以设计一些场景去进行验证。其重点,是要逐步在脑海之中建立一个Message在Kafka集群中进行流转的基础模型。

​ 其实Kafka的设计精髓,是在网络不稳定,服务也随时会崩溃的这些作死的复杂场景下,如何保证消息的高并发、高吞吐,那才是Kafka最为精妙的地方。但是要理解那些复杂的问题,都是需要建立在这个基础模型基础上的。

2.1、消费者分组消费机制

​ 这是我们在使用kafka时,最为重要的一个机制,因此最先进行梳理。

​ 在Consumer中,都需要指定一个GROUP_ID_CONFIG属性,这表示当前Consumer所属的消费者组。他的描述是这样的:

  public static final String GROUP_ID_CONFIG = "group.id";
  public static final String GROUP_ID_DOC = "A unique string that identifies the consumer group this consumer belongs to. This property is required if the consumer uses either the group management functionality by using <code>subscribe(topic)</code> or the Kafka-based offset management strategy.";

既然这里提到了kafka-based offset management strategy,那是不是也有非Kafka管理Offset的策略呢?

另外,还有一个相关的参数GROUP_INSTANCE_ID_CONFIG,可以给组成员设置一个固定的instanceId,这个参数通常可以用来减少Kafka不必要的rebalance。

​ 从这段描述中看到,对于Consumer,如果需要在subcribe时使用组管理功能以及Kafka提供的offset管理策略,那就必须要配置GROUP_ID_CONFIG属性。这个分组消费机制简单描述就是这样的:

Kafka收发消息核心参数详解,MQ,kafka

​ 生产者往Topic下发消息时,会尽量均匀的将消息发送到Topic下的各个Partition当中。而这个消息,会向所有订阅了该Topic的消费者推送。推送时,每个ConsumerGroup中只会推送一份。也就是同一个消费者组中的多个消费者实例,只会共同消费一个消息副本。而不同消费者组之间,会重复消费消息副本。这就是消费者组的作用。

​ 与之相关的还有Offset偏移量。这个偏移量表示每个消费者组在每个Partiton中已经消费处理的进度。在Kafka中,可以看到消费者组的Offset记录情况。文章来源地址https://www.toymoban.com/news/detail-729616.html

[oper@worker1 bin]$ ./kafka-consumer-groups.sh --bootstrap-server worker1:9092 --describe --group test

到了这里,关于Kafka收发消息核心参数详解的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【SpringBoot框架篇】35.kafka环境搭建和收发消息

    kafka依赖java环境,如果没有则需要安装jdk kafka3.0版本后默认自带了zookeeper,3.0之前的版本需要单独再安装zookeeper,我使用的最新的3.6.1版本。 cd到kafka的安装根目录后,执行下面命令指令 zookeeper.properties 文件路径启动zookeeper,默认启动的zk服务使用内存是512m,可以修改 zookeeper-server

    2024年01月18日
    浏览(33)
  • 消息中间件(MQ)对比:RabbitMQ、Kafka、ActiveMQ 和 RocketMQ

    前言 在构建分布式系统时,选择适合的消息中间件是至关重要的决策。RabbitMQ、Kafka、ActiveMQ 和 RocketMQ 是当前流行的消息中间件之一,它们各自具有独特的特点和适用场景。本文将对这四种消息中间件进行综合比较,帮助您在项目中作出明智的选择。 1. RabbitMQ 特点: 消息模

    2024年02月20日
    浏览(38)
  • 【MQ】最强Kafka详解

    kafka是一款分布式、支持分区的、多副本,基于zookeeper协调的分布式消息系统。最大的特性就是可以实时处理大量数据来满足需求。 日志收集:可以用kafka收集各种服务的日志 ,通过已统一接口的形式开放给各种消费者。 消息系统:解耦生产和消费者,缓存消息。 用户活动

    2024年02月11日
    浏览(34)
  • Kafka消息队列核心概念以及部署

    2023年06月29日
    浏览(39)
  • 分布式 - 消息队列Kafka:Kafka 消费者消息消费与参数配置

    01. 创建消费者 在读取消息之前,需要先创建一个KafkaConsumer对象。创建KafkaConsumer对象与创建KafkaProducer对象非常相似——把想要传给消费者的属性放在Properties对象里。 为简单起见,这里只提供4个必要的属性:bootstrap.servers、key.deserializer 和 value.deserializer。 ① bootstrap.servers 指

    2024年02月12日
    浏览(31)
  • 分布式 - 消息队列Kafka:Kafka生产者架构和配置参数

    生产者发送消息流程参考图1: 先从创建一个ProducerRecord对象开始,其中需要包含目标主题和要发送的内容。另外,还可以指定键、分区、时间戳或标头。在发送ProducerRecord对象时,生产者需要先把键和值对象序列化成字节数组,这样才能在网络上传输。 接下来,如果没有显式

    2024年02月13日
    浏览(35)
  • 领航分布式消息系统:一起探索Apache Kafka的核心术语及其应用场景

            本文是Kafka系列文章的第一篇,将带你了解Kafka的核心术语及其应用场景,后续会逐步探索其各方面的原理及应用场景。下面先看一张大概得简图,涉及Kafka的功能、原理等等,后续不断深入介绍,欢迎关注。         消息中间件(message queue middleWare, MQ)指利用高

    2024年01月21日
    浏览(32)
  • Kafka生产者原理 kafka生产者发送流程 kafka消息发送到集群步骤 kafka如何发送消息 kafka详解

    kafka尚硅谷视频: 10_尚硅谷_Kafka_生产者_原理_哔哩哔哩_bilibili ​      1. producer初始化:加载默认配置,以及配置的参数,开启网络线程      2. 拦截器拦截      3. 序列化器进行消息key, value序列化      4. 进行分区      5. kafka broker集群 获取metaData      6. 消息缓存到

    2024年02月11日
    浏览(37)
  • Kafka消息消费流程详解

    在分布式系统中,Kafka是一种常用的消息队列系统,用于实现高可靠性的消息传递。本文将介绍Kafka消息消费的流程,并提供相应的示例代码。 Kafka消费者的流程可以概括为以下几个步骤: 创建Kafka消费者实例; 订阅一个或多个主题; 拉取消息记录; 处理消息; 提交消费位

    2024年02月09日
    浏览(61)
  • Kafka事务是怎么实现的?Kafka事务消息原理详解(文末送书)

    大家好,我是哪吒。 前两天,有个朋友去面试,被问到Kafka事务的问题。 她的第一反应是: 我是来面试Java的,怎么问我大数据的Kafka? 文末送5本《Spring Boot 3核心技术与最佳实践》 不过Kafka确实是Java程序员必备的中间件技术了,这点是毋庸置疑的。 Kafka几乎是当今时代背景

    2024年02月04日
    浏览(35)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包