架构师系列- 消息中间件(13)-kafka深入应用

这篇具有很好参考价值的文章主要介绍了架构师系列- 消息中间件(13)-kafka深入应用。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

4、深入应用

4.1 springboot-kafka

1)配置文件

 kafka:
    bootstrap-servers: 192.168.10.30:10903,192.168.10.30:10904
    producer: # producer 生产者
      retries: 0 # 重试次数
      acks: 1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
      batch-size: 16384 # 一次最多发送数据量
      buffer-memory: 33554432 # 生产端缓冲区大小
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

    consumer: # consumer消费者
      group-id: javagroup # 默认的消费组ID
      enable-auto-commit: true # 是否自动提交offset
      auto-commit-interval: 100 # 提交offset延时(接收到消息后多久提交offset)
      auto-offset-reset: latest  #earliest,latest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

 2)启动信息

架构师系列- 消息中间件(13)-kafka深入应用,kafka,分布式

4.2 消息发送

4.2.1 发送类型

KafkaTemplate调用send时默认采用异步发送,如果需要同步获取发送结果,调用get方法

详细代码参考:AsyncProducer.java

消费者使用:KafkaConsumer.java

1)同步发送

     ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("test", JSON.toJSONString(message));
                //注意,可以设置等待时间,超出后,不再等候结果
        SendResult<String, Object> result = future.get(3,TimeUnit.SECONDS);
        logger.info("send result:{}",result.getProducerRecord().value());

通过swagger发送,控制台可以正常打印send result

swagger访问地址:http://localhost:8080/doc.html

2)阻断

在服务器上,将kafka暂停服务

docker-compose -f km.yml pause kafka-1 kafka-2

在swagger发送消息

调同步发送:请求被阻断,一直等待,超时后返回错误

架构师系列- 消息中间件(13)-kafka深入应用,kafka,分布式

而调异步发送的(默认发送接口),请求立刻返回。

架构师系列- 消息中间件(13)-kafka深入应用,kafka,分布式

那么,异步发送的消息怎么确认发送情况呢???往下看!

3)注册监听

代码参考: KafkaListener.java (释放注解)

可以给kafkaTemplate设置Listener来监听消息发送情况,实现内部的对应方法

 kafkaTemplate.setProducerListener(new ProducerListener<String, Object>() {});

查看控制台,等待一段时间后,异步发送失败的消息会被回调给注册过的listener

com.itheima.demo.config.KafkaListener:error!message={"message":"1","sendTime":1609920296374}

 启动kafka

docker-compose unpause kafka-1 kafka-2

 再次发送消息时,同步异步均可以正常收发,并且监听进入success回调

com.itheima.demo.config.KafkaListener$1:ok,message={"message":"1","sendTime":1610089315395}
com.itheima.demo.controller.PartitionConsumer:patition=1,message:[{"message":"1","sendTime":1610089315395}]

可以看到,在内部类 KafkaListener$1 中,即注册的Listener的消息。

4.2.2 序列化

消费者使用:KafkaConsumer.java

1)序列化详解

  • 前面用到的是Kafka自带的字符串序列化器(org.apache.kafka.common.serialization.StringSerializer)
  • 除此之外还有:ByteArray、ByteBuffer、Bytes、Double、Integer、Long 等
  • 这些序列化器都实现了接口 (org.apache.kafka.common.serialization.Serializer)
  • 基本上,可以满足绝大多数场景

2)自定义序列化

自己实现,实现对应的接口即可,有以下方法:

public interface Serializer<T> extends Closeable {
    default void configure(Map<String, ?> configs, boolean isKey) {
    }

    //理论上,只实现这个即可正常运行
    byte[] serialize(String var1, T var2);

    //默认调上面的方法
    default byte[] serialize(String topic, Headers headers, T data) {
        return this.serialize(topic, data);
    }

    default void close() {
    }
}

案例,参考: MySerializer.java

在yaml中配置自己的编码器

value-serializer: com.test.demo.config.MySerializer

 重新发送,发现:消息发送端编码回调一切正常。但是消费端消息内容不对!

com.itheima.demo.controller.KafkaListener$1:ok,message={"message":"1","sendTime":1609923570477}
com.itheima.demo.controller.KafkaConsumer:message:"{\"message\":\"1\",\"sendTime\":1609923570477}"

3)解码

发送端有编码并且我们自己定义了编码,那么接收端自然要配备对应的解码策略

代码参考:MyDeserializer.java,实现方式与编码器几乎一样!

在yaml中配置自己的解码器

value-deserializer: com.itheima.demo.config.MyDeserializer

再次收发,消息正常

com.itheima.demo.controller.AsyncProducer$1:ok,message={"message":"1","sendTime":1609924855896}
com.itheima.demo.controller.KafkaConsumer:message:{"message":"1","sendTime":1609924855896}

4.2.3 分区策略

分区策略决定了消息根据key投放到哪个分区,也是顺序消费保障的基石。

  • 给定了分区号,直接将数据发送到指定的分区里面去
  • 没有给定分区号,给定数据的key值,通过key取上hashCode进行分区
  • 既没有给定分区号,也没有给定key值,直接轮循进行分区
  • 自定义分区,你想怎么做就怎么做

1)验证默认分区规则

发送者代码参考:PartitionProducer.java

消费者代码使用:PartitionConsumer.java

通过swagger访问setKey:

架构师系列- 消息中间件(13)-kafka深入应用,kafka,分布式

 架构师系列- 消息中间件(13)-kafka深入应用,kafka,分布式

再访问setPartition来设置分区号0来发送

架构师系列- 消息中间件(13)-kafka深入应用,kafka,分布式 架构师系列- 消息中间件(13)-kafka深入应用,kafka,分布式

2)自定义分区

你想自己定义规则,根据我的要求,把消息投放到对应的分区去? 可以!

参考代码:MyPartitioner.java , MyPartitionTemplate.java ,

发送使用:MyPartitionProducer.java

使用swagger,发送0开头和非0开头两种key试一试!

架构师系列- 消息中间件(13)-kafka深入应用,kafka,分布式

备注:

自己定义config参数,比较麻烦,需要打破默认的KafkaTemplate设置

可以将KafkaConfiguration.java中的getTemplate加上@Bean注解来覆盖系统默认bean

这里为了避免混淆,采用@Autowire注入。

4.3 消息消费

4.3.1 消息组别

发送者使用:KafkaProducer.java

1)代码参考:GroupConsumer.java,Listener拷贝3份,分别赋予两组group,验证分组消费:

架构师系列- 消息中间件(13)-kafka深入应用,kafka,分布式

 架构师系列- 消息中间件(13)-kafka深入应用,kafka,分布式

通过swagger发送2条消息

架构师系列- 消息中间件(13)-kafka深入应用,kafka,分布式 

  • 同一group下的两个消费者,在group1均分消息
  • group2下只有一个消费者,得到全部消息

4)消费端闲置

注意分区数与消费者数的搭配,如果 ( 消费者数 > 分区数量 ),将会出现消费者闲置,浪费资源!

验证方式:

停掉项目,删掉test主题,重新建一个 ,这次只给它分配一个分区。

重新发送两条消息,试一试

架构师系列- 消息中间件(13)-kafka深入应用,kafka,分布式

解析:

group2可以消费到1、2两条消息

group1下有两个消费者,但是只分配给了 -1 , -2这个进程被闲置

4.3.2 位移提交

1)自动提交

前面的案例中,我们设置了以下两个选项,则kafka会按延时设置自动提交

enable-auto-commit: true # 是否自动提交offset
auto-commit-interval: 100  # 提交offset延时(接收到消息后多久提交offset)

2)手动提交

有些时候,我们需要手动控制偏移量的提交时机,比如确保消息严格消费后再提交,以防止丢失或重复。

下面我们自己定义配置,覆盖上面的参数

代码参考:MyOffsetConfig.java

通过在消费端的Consumer来提交偏移量,有如下几种方式:

代码参考:MyOffsetConsumer.java

同步提交、异步提交:manualCommit() ,同步异步的差别,下面会详细讲到。

指定偏移量提交:offset()

3)重复消费问题

如果手动提交模式被打开,一定不要忘记提交偏移量。否则会造成重复消费!

代码参考和对比:manualCommit() , noCommit()

验证过程:

用km将test主题删除,新建一个test空主题。方便观察消息偏移 注释掉其他Consumer的Component注解,只保留当前MyOffsetConsumer.java 启动项目,使用swagger的KafkaProducer发送连续几条消息 留心控制台,都能消费,没问题:

架构师系列- 消息中间件(13)-kafka深入应用,kafka,分布式

但是!重启试试:

架构师系列- 消息中间件(13)-kafka深入应用,kafka,分布式

无论重启多少次,不提交偏移量的消费组,会重复消费一遍!!!

再通过命令行查询偏移量试试:

 架构师系列- 消息中间件(13)-kafka深入应用,kafka,分布式

4)经验与总结

commitSync()方法,即同步提交,会提交最后一个偏移量。在成功提交或碰到无怯恢复的错误之前,commitSync()会一直重试,但是commitAsync()不会。

这就造成一个陷阱:
如果异步提交,针对偶尔出现的提交失败,不进行重试不会有太大问题,因为如果提交失败是因为临时问题导致的,那么后续的提交总会有成功的。只要成功一次,偏移量就会提交上去。

但是!如果这是发生在关闭消费者时的最后一次提交,就要确保能够提交成功,如果还没提交完就停掉了进程。就会造成重复消费!

因此,在消费者关闭前一般会组合使用commitAsync()和commitSync()。
详细代码参考:MyOffsetConsumer.manualOffset()

5、高级特性

5.1 扩展性

5.1.1 broker扩容

1)在yaml中复制kafka-2,拷贝为新的节点,注意以下标注修改的地方!

#修改后的内容参考:cluster.yml

    kafka-3: #改
        container_name: kafka-3 #改
        image: wurstmeister/kafka:2.12-2.2.2
        ports:
            - 10905:9092 #改
        environment:
            KAFKA_BROKER_ID: 3 #改
            HOST_IP: 192.168.10.30
            KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
            KAFKA_ADVERTISED_HOST_NAME: 192.168.10.30
            KAFKA_ADVERTISED_PORT: 10905 #改
        volumes:
            - /etc/localtime:/etc/localtime
        depends_on:
            - zookeeper 

 完整的 cluster.yml

version: '3'
services:
    zookeeper:
        image: zookeeper:3.4.13

    kafka-1:
        container_name: kafka-1
        image: wurstmeister/kafka:2.12-2.2.2
        ports:
            - 10903:9092
        environment:
            KAFKA_BROKER_ID: 1
            HOST_IP: 192.168.10.30
            KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
            #docker部署必须设置外部可访问ip和端口,否则注册进zk的地址将不可达造成外部无法连接
            KAFKA_ADVERTISED_HOST_NAME: 192.168.10.30
            KAFKA_ADVERTISED_PORT: 10903
        volumes:
            - /etc/localtime:/etc/localtime
        depends_on:
            - zookeeper
    kafka-2:
        container_name: kafka-2
        image: wurstmeister/kafka:2.12-2.2.2
        ports:
            - 10904:9092
        environment:
            KAFKA_BROKER_ID: 2
            HOST_IP: 192.168.10.30
            KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
            KAFKA_ADVERTISED_HOST_NAME: 192.168.10.30
            KAFKA_ADVERTISED_PORT: 10904
        volumes:
            - /etc/localtime:/etc/localtime
        depends_on:
            - zookeeper
    km:
        image: liggdocker/km:2002
        ports:
            - 10906:9000
        depends_on:
            - zookeeper
    kafka-3: #改
        container_name: kafka-3 #改
        image: wurstmeister/kafka:2.12-2.2.2
        ports:
            - 10905:9092 #改
        environment:
            KAFKA_BROKER_ID: 3 #改
            HOST_IP: 192.168.10.30
            KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
            KAFKA_ADVERTISED_HOST_NAME: 192.168.10.30
            KAFKA_ADVERTISED_PORT: 10905 #改
        volumes:
            - /etc/localtime:/etc/localtime
        depends_on:
            - zookeeper

 2)更新docker集群信息

docker-compose -f cluster.yml up -d
#启动消息

kafka_zookeeper_1 is up-to-date
kafka_km_1 is up-to-date
kafka-1 is up-to-date
kafka-2 is up-to-date
Creating kafka-3 ... done

3)进命令行,或打开km查看新的broker信息

架构师系列- 消息中间件(13)-kafka深入应用,kafka,分布式

5.1.2 分区扩容

1)使用km对test主题增加分区到3个,看分区分配机器情况

架构师系列- 消息中间件(13)-kafka深入应用,kafka,分布式 可以指定新分区数量,及分配到的机器

架构师系列- 消息中间件(13)-kafka深入应用,kafka,分布式

2)注意问题

新加分区或重新调整分区,已经启动的客户端会动态更新对应的分配信息,不需要重启。

但是!!!

在同步变更消息的过程中有可能会丢失消息!想想为什么?(答案在下面)

(注意!以下场景不保证100%会重现!)

架构师系列- 消息中间件(13)-kafka深入应用,kafka,分布式

答案:

回顾一下消费偏移量的默认提交配置:latest,因为新分区没有任何offset提交记录

所以会在重新分配分区后从末尾开始消费!

那么分配前的那些消息就不会消费到。而分配后再发送的不会受影响,可以正常消费

分区分配正常后,查看偏移量提交信息,没问题:

架构师系列- 消息中间件(13)-kafka深入应用,kafka,分布式

km的Consumer页签里也可以查看偏移量信息:

架构师系列- 消息中间件(13)-kafka深入应用,kafka,分布式

5.2 高可用

以上动态扩容操作是怎么实现的呢?集群中必然有一个节点协调了相关操作。

这台协调者,就是controller节点。

controller节点是其中的一台broker,所有broker都有可能成为controller

当前controller宕机后,其他就会参与竞争,选出新的controller,保持集群对外的高可用

5.2.1 节点选举

1)查找controller,找到它所在的broker

1)查找controller,找到它所在的broker

#查找docker进程,找到zookeeper的容器
[root@iZ8vb3a9qxofwannyywl6zZ ~]# docker ps --format "table{{.ID}}\t{{.Names}}\t{{.Ports}}"
CONTAINER ID        NAMES               PORTS
75318748caab        kafka-3             0.0.0.0:10905->9092/tcp
4807d188a180        kafka_km_1          0.0.0.0:10906->9000/tcp
4453eb0b2a36        kafka-2             0.0.0.0:10904->9092/tcp
d6fd814a0851        kafka-1             0.0.0.0:10903->9092/tcp
8c1fc2cc6e9a        kafka_zookeeper_1   2181/tcp, 2888/tcp, 3888/tcp

#进入容器,连上zk
[root@iZ8vb3a9qxofwannyywl6zZ ~]# docker exec -it kafka_zookeeper_1 sh
/zookeeper-3.4.13 #
/zookeeper-3.4.13 # zkCli.sh
Connecting to localhost:2181

#查询当前controller是哪个节点,发现是2号机器(有可能是其他节点,找到这个brokerid,下面要用!)
[zk: localhost:2181(CONNECTED) 6] get /controller
{"version":1,"brokerid":2,"timestamp":"1610500701187"}

#controller变更的次数
[zk: localhost:2181(CONNECTED) 7] get /controller_epoch
1
	

2)docker-compose停掉它!

#docker pause 暂停容器的服务,注意是上面找到的那台broker
[root@iZ8vb3a9qxofwannyywl6zZ ~]# docker pause kafka-2
kafka-2

#查看状态,发现(Paused)
[root@iZ8vb3a9qxofwannyywl6zZ ~]# docker ps | grep kafka-2
4453eb0b2a36        wurstmeister/kafka:2.12-2.2.2                    "start-kafka.sh"         2 days ago          Up 2 days (Paused)   0.0.0.0:10904->9092/tcp                                                                      kafka-2

#再次按 1)的步骤进入zk容器,查看当前controller,已经变为3号
[zk: localhost:2181(CONNECTED) 0] get /controller
{"version":1,"brokerid":3,"timestamp":"1610679583216"}

#变更次数加了1
[zk: localhost:2181(CONNECTED) 1] get /controller_epoch
2

5.2.2 原理剖析

当控制器被关闭或者与Zookeeper系统断开连接时,Zookeeper系统上的/controller临时节点就会被清除。

Kafka集群中的监听器会接收到变更通知,各个代理节点会尝试到Zookeeper系统中创建它。

第一个成功在Zookeeper系统中创建的代理节点,将会成为新的控制器。

每个新选举出来的控制器,会在Zookeeper系统中递增controller_epoch的值。

附:详细流程图

架构师系列- 消息中间件(13)-kafka深入应用,kafka,分布式 文章来源地址https://www.toymoban.com/news/detail-861620.html

到了这里,关于架构师系列- 消息中间件(13)-kafka深入应用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 消息中间件之Kafka(一)

    高性能的消息中间件,在大数据的业务场景下性能比较好,kafka本身不维护消息位点,而是交由Consumer来维护,消息可以重复消费,并且内部使用了零拷贝技术,性能比较好 Broker持久化消息时采用了MMAP的技术,Consumer拉取消息时使用的sendfile技术 Kafka是最初由Linkedin公司开发,

    2024年01月20日
    浏览(51)
  • 消息中间件之Kafka(二)

    1.1 为什么要对topic下数据进行分区存储? 1.commit log文件会受到所在机器的文件系统大小的限制,分区之后可以将不同的分区放在不同的机器上, 相当于对数据做了分布式存储,理论上一个topic可以处理任意数量的数据 2.提高并行度 1.2 如何在多个partition中保证顺序消费? 方案一

    2024年01月21日
    浏览(49)
  • 深入理解Java消息中间件-组件-消息队列

    引言: 消息中间件在现代分布式系统中扮演着至关重要的角色,它解决了系统之间异步通信和解耦的需求。而在消息中间件的架构中,核心组件之一就是消息队列。本文将深入探讨消息队列的架构组件,帮助读者加深对消息中间件的理解和应用。 一、什么是消息队列 消息队列

    2024年04月27日
    浏览(51)
  • Kafka消息中间件(Kafka与MQTT区别)

    Kafka是一个分布式流处理平台,它可以快速地处理大量的数据流。Kafka的核心原理是基于 发布/订阅 模式的消息队列。Kafka允许多个生产者将数据写入主题(topic)中,同时也允许多个消费者从主题中读取数据。 Kafka重要原理 Kafka的设计原则之一是高可用性和可扩展性,因此它

    2024年02月03日
    浏览(48)
  • 【Java面试丨消息中间件】Kafka

    1. 介绍 使用kafka在消息的收发过程都有可能会出现消息丢失 (1)生产者发送消息到broker丢失 (2)消息在broker中存储丢失 (3)消费者从broker接收消息丢失 2. 生产者发送消息到broker丢失 设置异步发送:同步发送会发生阻塞,一般使用异步发送方式发送消息 消息重试:由于网

    2024年02月11日
    浏览(47)
  • 深入理解Java消息中间件-消息追踪和日志管理

    在分布式系统中,确保系统的稳定性和可靠性是一个极其复杂和挑战性的任务。随着系统的规模增大和组件间交互的复杂性提升,问题定位和故障排除变得越来越困难。在这种背景下,消息追踪和日志管理成为了日常工作中不可或缺的一部分,它们为开发和运维团队提供了宝

    2024年04月28日
    浏览(50)
  • 消息中间件,RabbitMQ,kafka常见面试题

    RabbitMQ和Kafka都是消息队列系统,可以用于流处理。流处理是指对高速、连续、增量的数据进行实时处理。 RabbitMQ 和 Kafka 的相同点有以下几个: 都是消息队列系统,可以用于流处理、异步通信、解耦等场景 都是开源的,有活跃的社区和丰富的文档 都支持分布式部署,具有高

    2024年02月04日
    浏览(39)
  • 深入了解 RabbitMQ:高性能消息中间件

    在现代分布式系统中,消息队列成为了实现系统间异步通信、削峰填谷以及解耦组件的重要工具。而RabbitMQ作为一个高效可靠的消息队列解决方案,已经成为许多企业广泛采用的选择。本文将介绍RabbitMQ的基本概念、主要特性以及常见应用场景。 RabbitMQ 是一个开源的高性能、

    2024年02月08日
    浏览(51)
  • 消息中间件系列 - RocketMQ

    本内容仅用于个人学习笔记,如有侵扰,联系删除 【尚硅谷】RocketMQ教程丨深度掌握MQ消息中间件_哔哩哔哩_bilibili 1 、MQ简介 MQ , Message Queue ,是一种提供 消息队列服务 的中间件,也称为消息中间件,是一套提供了消息生产、存储、消费全过程API的软件系统。消息即数据。

    2024年02月16日
    浏览(73)
  • 深入详解高性能消息队列中间件 RabbitMQ

      目录 1、引言 2、什么是 RabbitMQ ? 3、RabbitMQ 优势 4、RabbitMQ 整体架构剖析 4.1、发送消息流程 4.2、消费消息流程 5、RabbitMQ 应用 5.1、广播 5.2、RPC VC++常用功能开发汇总(专栏文章列表,欢迎订阅,持续更新...) https://blog.csdn.net/chenlycly/article/details/124272585 C++软件异常排查从入

    2024年02月05日
    浏览(79)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包