一篇搞定Kafka

这篇具有很好参考价值的文章主要介绍了一篇搞定Kafka。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

 目录

1、Kafka的四个角色解释

2、Kafka与zookeeper的关系与环境搭建

3、Kafka入门小案例

4、Kafka分区机制

4.1、Topic在分区下如何存储消息

​4.2、消息的分区策略

5、Kafka高可用设计方案

5.1、集群

5.2、备份机制(Replication)

5.2.1、两种追随者

6、生产者详解

6.1、参数配置

7、消费者详解

7.1、消费者组

​7.2、消息有序性

​ 7.3、提交偏移量带来的问题及解决方案

7.3.1、自动提交

重复消费

消息丢失

7.3.2、手动提交

同步提交 

异步提交 

同步加异步

8、封装消息的方式


1、Kafka的四个角色解释

Kafka官网

kafka官网:http://kafka.apache.org/ 

一篇搞定Kafka,kafka,java,后端,spring cloud,spring boot

producer:发布消息的对象称之为主题生产者(Kafka topic producer)

topic:对于生产者生产的消息分门别类,每一类消息称之为一个主题(Topic)

consumer:订阅主题并负责消费主题里生产者生产的消息称之为主题消费者(consumer)

broker:生产者发布的消息保存在一组服务器中,称之为Kafka集群Kafka Cluster,集群中的每一个服务器都是一个代理(Broker),消费者可以订阅一个或多个主题topic,并从Broker拉数据,从而消费这些已发布的消息

2、Kafka与zookeeper的关系与环境搭建

Kafka对于zookeeper是强依赖,zookeeper相当于Kafka的注册中心保存kafka相关的节点数据,所以安装Kafka之前必须先安装zookeeper

Docker安装zookeeper

下载镜像:

docker pull zookeeper:3.4.14

创建容器:

docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14

Docker安装kafka

下载镜像:

docker pull wurstmeister/kafka:2.12-2.3.1

创建容器:

docker run -d --name kafka \
--env KAFKA_ADVERTISED_HOST_NAME=192.168.200.130 \
--env KAFKA_ZOOKEEPER_CONNECT=192.168.200.130:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.130:9092 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
--env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
--net=host wurstmeister/kafka:2.12-2.3.1

注意:换成自己镜像的IP地址

3、Kafka入门小案例

POM.XML

  <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
  </dependency>
  <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
  </dependency>

生产者

application.yml

spring:
  kafka:
    bootstrap-servers: 192.168.200.130:9092
    producer:
      retries: 10
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
/**
 * @author JiaWei
 * 生产者
 */
public class ProducerQuikStart {

    public static void main(String[] args) {

        //Kafka配置信息
        Properties prop = new Properties();
        //连接地址
        prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");
        //key和value进行序列化网络传输
        prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

        //创建Kafka生产者对象
        KafkaProducer<String,String> producer = new KafkaProducer<String, String>(prop);

        //发送消息
        /**
         * 1.主题
         * 2.key
         * 3.消息
         */
        ProducerRecord<String,String> kvproducerRecord = new ProducerRecord<String,String>("topic-first","key-001","hello kafka");
        producer.send(kvproducerRecord);

        //释放 
        producer.close();

    }
}

消费者

spring:
  kafka:
    bootstrap-servers: 192.168.200.130:9092
    consumer:
      group-id: ${spring.application.name}
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

同一个消费者组里的消费者同时订阅一个主题,再有最新的消息时,这条消息只能被一个消费者消费

如果想让一条消息被所有消费者接收,那么就需要给多个消费者在不同的消费者组里面

/**
 * @author JiaWei
 * 消费者
 */
public class ConsumerQuickStart {

    public static void main(String[] args) {

        //kafka消费者配置信息
        Properties prop = new Properties();
        prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");
        //key 和 value反序列化
        prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");

        //设置消费者组 当前消费者在那个消费者组
        prop.put(ConsumerConfig.GROUP_ID_CONFIG,"group1");

        //消费者
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(prop);

        //订阅主题
        consumer.subscribe(Collections.singletonList("topic-first"));

        //拉取消息
        while (true) {
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord.key());
                System.out.println(consumerRecord.value());
            }
        }
    }
}

4、Kafka分区机制

概述:分区就相当于一个文件夹其中的文件就是Topic文件里的内容就是消息 ,且分区建立在Borker服务器中,多个Borker建立成Kafka集群,分区解决的问题就是假如所有的Topic都存储在一台服务器上如果数据量特别大可能会存储不了,所以Kafka为了存储更多的数据不受一台服务器的限制,允许你的Topic存储在不同机器Broker上,所以在创建Topic时可以存储在不同的分区下面。 

一篇搞定Kafka,kafka,java,后端,spring cloud,spring boot

4.1、Topic在分区下如何存储消息

发送消息时可以把topic存储在3个分区下面,每个分区下面消息都是有顺序的,并且有对应的编号进行记录且是连续自增的,它称之为偏移量(offset),就可以知道你的消息在分区的什么位置

4.2、消息的分区策略

一篇搞定Kafka,kafka,java,后端,spring cloud,spring boot

5、Kafka高可用设计方案

5.1、集群

一个Kafka集群由多个Broker组成,这样集群一台机器宕机,其他机器上的Broker也依然能够对外提供服务

一篇搞定Kafka,kafka,java,后端,spring cloud,spring boot

5.2、备份机制(Replication)

备份机制解决消息丢失,消息持久化

生产者在连接上Kafka集群后发送消息会先在领导者副本(Leader Replica)进行存储,而后由领导者副本将消息同步到追随者副本(Follower Replica)中进行备份防止消息丢失,且追随者副本有多个。

一篇搞定Kafka,kafka,java,后端,spring cloud,spring boot

5.2.1、两种追随者

追随者有两种分别是ISR和普通分别为同步和异步,同步数据为强一致性,异步保证可用性

如果leader失效,优先选取ISR列表中的follower因为数据较新,如果ISR列表的follower都不行了,次选其他follower

一篇搞定Kafka,kafka,java,后端,spring cloud,spring boot

极端情况:所有副本都失效了一篇搞定Kafka,kafka,java,后端,spring cloud,spring boot

6、生产者详解

生产者发送消息有两种方式

同步发送:如果消息特别多消费者处理太慢就会造成阻塞

一篇搞定Kafka,kafka,java,后端,spring cloud,spring boot

异步发送:消费者消费的慢也不会造成阻塞,不用等到消费者消费完消息,生产者才会发送下一条消息

一篇搞定Kafka,kafka,java,后端,spring cloud,spring boot

6.1、参数配置

消息确认机制ack,开发中一般不会设置默认值为acks=1也就是leader在接收到消息保存后返回给生产者就完事了。

一篇搞定Kafka,kafka,java,后端,spring cloud,spring boot

生产者消息重发机制retries

leader如果长时间没有给生产者回执,那么生产者就会重复发送消息,我们则可以设置次数,默认情况下,生产者会在每次重试之间等待100ms

一篇搞定Kafka,kafka,java,后端,spring cloud,spring boot

参数详解-消息压缩

降低消息传输网络带宽的开销,也会降低存储消息的空间

一篇搞定Kafka,kafka,java,后端,spring cloud,spring boot

7、消费者详解

7.1、消费者组

指的是由一个或多个消费者组成的群体,一条消息在topic上分发给消费者组中的消费者,如果所有消费者都在一个消费者组中则消息只能被一个消费者消费且只有一个消费者能接收到消息,反之如果想让所有消费者都接收到消息那么就给每个消费者分配一个消费者组。

7.2、消息有序性

由于生产消息和消费消息是同时进行的他保证不了消息的顺序性,如果想要保证则需要单独指定分区以及对应唯一的消费者因为消息是有offerset偏移量的消费者按照偏移量进行消费就能保证消费的顺序性

 7.3、提交偏移量带来的问题及解决方案

消费者消费完消息会提交偏移量标注消费到分区的什么位置分为两种方式

一篇搞定Kafka,kafka,java,后端,spring cloud,spring boot

7.3.1、自动提交

一篇搞定Kafka,kafka,java,后端,spring cloud,spring boot

Kafka会默认每五秒主动提交一次偏移量,这种方式面临两个问题 重复消费及消息丢失

重复消费

同一个消费者组中的消费者负责消费对应的分区消息,如果消费者组中的其中一个消费者挂掉了,那么它负责的分区中的消息就由同一个消费者组中的消费者负责进行消费,如果由于网络波动,突然宕机、等原因,造成挂掉消费者提交的偏移量是在10的位置但是你实际提交的是5的位置还没来得及提交你就挂掉了,那么接手你负责分区的消费者就会从5的位置进行消费,这样就造成重复消费了

一篇搞定Kafka,kafka,java,后端,spring cloud,spring boot

消息丢失

同一个消费者组中的消费者负责消费对应的分区消息,如果消费者组中的其中一个消费者挂掉了,那么它负责的分区中的消息就由同一个消费者组中的消费者负责进行消费,如果由于网络波动、突然宕机、等原因,造成消费者其实才消费到分区偏移量5的位置但是它实际提交的偏移量是10的位置,那么接手挂掉消费者对应分区的消费者就会从10的位置往后消费,那么10之前的消息还没来得及消费就被丢失了

一篇搞定Kafka,kafka,java,后端,spring cloud,spring boot

7.3.2、手动提交

一篇搞定Kafka,kafka,java,后端,spring cloud,spring boot

同步提交 

同步提交偏移量如果出现异常可能造成阻塞,那么他就会一直等响应然后进行重试提交直到成功,在消费完消息的代码下将提交偏移量代码写上

​
while (true){   
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> record : records) {                    
         System.out.println(record.value());
         System.out.println(record.key());
         try {
             consumer.commitSync();//同步提交当前最新的偏移量   
         }catch (CommitFailedException e){
             System.out.println("记录提交失败的异常:"+e);
          }  
      }
}

异步提交 

异步提交不会造成阻塞,但是异步提交不会进行重试,异步没有重试是因为如果同时存在多个异步提交可能会导致偏移量的覆盖,就好比现在都能去提交并没有造成阻塞,多个提交同时进行就可能会重复覆盖,所以没有重试

while (true){
   ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));    
   for (ConsumerRecord<String, String> record : records) {        
       System.out.println(record.value());
       System.out.println(record.key());
    }  
   consumer.commitAsync(new OffsetCommitCallback() {
       @Override
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
           if(e!=null){
               System.out.println("记录错误的提交偏移量:"+ map+",异常信息"+e);
          }       
      } 
   });
}

同步加异步

如果异步提交失败之后他先去记录日志,然后因为异步已经提交失败了那么同步再去提交,这样就能保证偏移量成功的提交,所以说这两种方式结合起来会比较好,保证能够重试也能保证不阻塞

try {
   while (true){
       ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
       for (ConsumerRecord<String, String> record : records) {                        
            System.out.println(record.value());
            System.out.println(record.key());
       }
        consumer.commitAsync();
    }
}catch (Exception e){+
        e.printStackTrace();
    System.out.println("记录错误信息:"+e);
}finally { 
   try {
        consumer.commitSync();
    }finally {
        consumer.close();
    }
}

8、封装消息的方式

利用阿里fastJSON的方式来进行消息的发送与消费

生产者

@Autowired
KafkaTemplate<String, String> kafkaTemplate;

Map<String, Object> map = new HashMap<>();
                map.put("articleId", wmNews.getArticleId());
                map.put("enable", dto.getEnable());
kafkaTemplate.send(WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC,JSON.toJSONString(map));

消费者文章来源地址https://www.toymoban.com/news/detail-808682.html

#controller
 @KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC)
    public void onMessage(String message){
        if (message != null){
            Map map = JSONObject.parseObject(message, Map.class);
            apArticleConfigService.updateByMap(map);
        }
    }


#Service
public interface ApArticleConfigService extends IService<ApArticleConfig> {

    void updateByMap(Map map);
}

#ServiceImpl
     @Override
    public void updateByMap(Map map) {

        Object articleId = map.get("articleId");
        Object enable = map.get("enable");
        Boolean isDown = true;
        if (enable.equals(1)){
            isDown = false;
        }
        Wrappers.<ApArticleConfig>lambdaUpdate().
                set(ApArticleConfig::getIsDown, isDown).
                eq(ApArticleConfig::getArticleId, articleId);
    }

到了这里,关于一篇搞定Kafka的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spring Boot+Kafka实战生产级Kafka消费组

    作者:禅与计算机程序设计艺术 Kafka是一个开源分布式消息系统,最初由LinkedIn开发,之后成为Apache项目的一部分。Kafka主要用于大数据实时流处理,具有低延迟、高吞吐量等特点。本文将会从基本概念、术语说明、原理及应用场景三个方面对Kafka进行详细介绍。 Kafka作为一个

    2024年02月10日
    浏览(34)
  • Spring for Apache Kafka Deep Dive – Part 2: Apache Kafka and Spring Cloud Stream

    On the heels of part 1 in this blog series, Spring for Apache Kafka – Part 1: Error Handling, Message Conversion and Transaction Support, here in part 2 we’ll focus on another project that enhances the developer experience when building streaming applications on Kafka: Spring Cloud Stream. We will cover the following in this post: Overview of Spring Clo

    2024年02月19日
    浏览(43)
  • 在Spring Boot微服务集成spring-kafka操作Kafka集群

    记录 :461 场景 :在Spring Boot微服务集成spring-kafka-2.8.2操作Kafka集群。使用KafkaTemplate操作Kafka集群的生产者Producer。使用@KafkaListener操作Kafka集群的消费者Consumer。 版本 :JDK 1.8,Spring Boot 2.6.3,kafka_2.12-2.8.0,spring-kafka-2.8.2。 Kafka集群安装 :https://blog.csdn.net/zhangbeizhen18/article/details

    2024年02月10日
    浏览(50)
  • Spring Boot 整合 Kafka

    环境:自行创建 Spring Boot 项目,添加测试依赖,并启动 Zookeeper 和 kafka 服务。 注意:Zookeeper 默认好像占用 8080 端口,自己注意端口占用问题。 1. 添加依赖 2. 添加配置 3. 创建消息生产者 4. 创建消息消费者 5. 消息发送测试

    2023年04月11日
    浏览(36)
  • spring cloud steam 整合kafka 进行消息发送与接收

    spring cloud steam : Binder和Binding Binder是SpringCloud Stream的一个抽象概念,是应用与消息中间件之间的粘合剂,目前SpringCloud Stream实现了Kafka和RabbitMQ的binder Binder可以生成Binding,Binding用来绑定消息容器的生产者和消费者,它有两种类型,INPUT和OUTPUT,INPUT对应于消费者,OUTPUT对应于

    2024年02月10日
    浏览(41)
  • Spring Boot集成Kafka详解

    Spring Boot是一个用于构建独立的、生产级的Java应用程序的框架,而Kafka是一种高吞吐量的分布式发布订阅消息系统。在本文中,我们将详细解释如何在Spring Boot项目中集成Kafka。 1. 添加依赖 首先,我们需要在项目的pom.xml文件中添加Spring Boot和Kafka的依赖。 2. 配置Kafka 接下来,

    2024年02月09日
    浏览(40)
  • Spring Boot Kafka Example

    作者:禅与计算机程序设计艺术 Kafka是一个分布式消息系统,它可以实现消息的持久化、高并发量处理以及实时的可靠传输。相比于其他消息队列中间件(例如RabbitMQ、ActiveMQ),其最大的优点在于它提供的跨越语言的API支持,支持多种编程语言的客户端。作为一种轻量级的分

    2024年02月07日
    浏览(42)
  • spring boot配置双Kafka方法

    第一步:application.yml的配置 第二步:配置config 注意!注意!注意!!!代码中的一些字段名自己改一下。xxxx之类的换成自己的就行 第三步: 直接在你要用到的类中直接引用就行。 跟着以上三步走就可以简单的配置两个Kafka了,还有跟高级的spring切面切点的方法作者还没有

    2024年02月11日
    浏览(40)
  • 轻松上手 Spring Boot & Kafka 实战!

    2、解压包 tar -zxvf kafka_2.11-1.0.0.tgzmv kafka_2.11-1.0.0 kafka1mv kafka_2.11-1.0.0 kafka2mv kafka_2.11-1.0.0 kafka3 3、创建ZK集群 修改ZK配置文件:kafka1-3/config/zookeeper.properties分别修改对应的参数。 dataDir=/usr/local/kafka/zookeeper1 dataLogDir=/usr/local/kafka/zookeeper/log clientPort=2181 maxClientCnxns=0 tickTime=2000

    2024年04月12日
    浏览(43)
  • 在Spring Boot微服务集成Kafka客户端(kafka-clients)操作Kafka

    记录 :459 场景 :在Spring Boot微服务集成Kafka客户端kafka-clients-3.0.0操作Kafka。使用kafka-clients的原生KafkaProducer操作Kafka生产者Producer。使用kafka-clients的原生KafkaConsumer操作Kafka的消费者Consumer。 版本 :JDK 1.8,Spring Boot 2.6.3,kafka_2.12-2.8.0,kafka-clients-3.0.0。 Kafka安装 :https://blog.csdn.ne

    2024年02月12日
    浏览(49)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包