Kafka 基础整理、 Springboot 简单整合

这篇具有很好参考价值的文章主要介绍了Kafka 基础整理、 Springboot 简单整合。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

定义:

  • Kafka 是一个分布式的基于发布/订阅默认的消息队列
  • 是一个开源的分布式事件流平台,被常用用于数据管道、流分析、数据集成、关键任务应用

消费模式:

  • 点对点模式 (少用)
    消费者主动拉取数据,消息收到后清除消息
    Kafka 基础整理、 Springboot 简单整合
  • 发布/订阅模式
    生产者推送消息到队列,都消费者订阅各自所需的消息
    Kafka 基础整理、 Springboot 简单整合

基本概念:

  • Producer: 消息生产者
  • Consumer: 消费者
  • Consumer: Group 消费者组,消费者组id相同得消费者为一个消费者组;一个消费者也为一个消费者组去消费
  • Broker: kafka服务器
  • Topic :消息主题, 数据分类
  • Partition :分区,一个Tpoic 有多个分区组成
  • Replica : 副本,每个分区对应多个副本
  • Leader:副本里包含leader、follower ;生产以及消费只针对 leader

生产者发送流程:

  • producer -> send(producerRecord) -> interceprots 拦截器 -> Serializer 序列化器 -> Partitioner 分区器
  • 当数据累积到 batch.size之后,sender才会发送数据;默认16k
  • 如果数据迟迟未达到batch.size , sender等待linger.ms设置的时间,到了之后就会发送数据。单位ms.默认值 0ms,标识没有延迟
  • compression.type 数据压缩方式
  • RecordAccumulator 缓冲区大小,默认32m
  • 应答模式ack
    • 0: 生产者发送数据后,不需要等待数据应答
    • 1:生产者发送过来的数据,Leader收到数据后应答
    • 1:all leader与其它所有节点收齐数据后应答Kafka 基础整理、 Springboot 简单整合

消费大概逻辑:

  • Kafka 基础整理、 Springboot 简单整合

消费者组(Consumer Group (CG)):

  • groupid相同的消费者形成一个消费者组
  • 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内的一个消费者消费
  • 消费者组之间互不影响
  • 当消费者组的数量,大于分区数,则会有闲置
  • coordinator:辅助实现消费者组的初始化分区的分配
    • 每个节点有个coordinator, 通过 groupid % 50,选择出 coordinator 节点 50为 _consumer_offset 的分区数
    • 1%50 = 1 , _consumer_offset 的 号分区上的 coordinator 则为 leader
    • coordinator 再消费者组中随机选择一个 consumer 成为leader,由leader 制定消费计划,让后返回给 coordinator ,再由coordinator 来把消费技化 分配给其它消费者
    • coordinator 与消费者的心跳保持时间 3秒45秒 超时 - 会移除消费者,触发再平衡
    • 消费者消费时间过长,默认 5分钟 - 会移除消费者触发再平衡

消费流程:

  • 创建消费者网络连接客户端 ConsumerNetworkClient,与kafka交互
  • 消费请求初始化:每批次最小抓取大小、数据未达到超时 时间 500ms 、抓取数据大小上限
  • 发送消费请求 -》onSuccess() 回调, 拉取数据 -》 按批次放入消息队列
  • 消费者从 消息队列每批次消费数据 (500条) -》反序列化 -》拦截器 -》 处理数据
  • Kafka 基础整理、 Springboot 简单整合

消费计划(分区分配策略)默认 Range + CooperativeSticky:

  • Range:针对每一个topic,对topic分区排序、消息者排序,通过分区数 / 消费者数,决定每个消息者消费几个分区,除不尽的前面的消费者多消费。 容易产生数据倾斜
    Kafka 基础整理、 Springboot 简单整合
  • RoundRobin:轮询分区策略,针对所有topic ,把所有topic的分区和消费者列出来,按照hashcode进行排序,通过轮询算法把分区分配给消费者
  • Sticky :黏性 (执行新的分配的时,尽量靠近上次的分配结果),首先回尽量的均匀,且随机分配分区到消费者
  • CooperativeSticky:协作者黏性,Sticky 的策略相同,但支持合作式再平衡,消费者可以继续从没有被重新分配的分区消费

offset 位移: 是标记消费消费位置

  • <0.9 : 是维护在 zookeeper中
  • 0.9 之后:offset 维护在一个内置的 topic :_consumer_offsets 中
  • 采用 key - value 方式存储数据,key:groupid +topic + 分区号
  • offset 自动提交:默认每5秒自动提交offset ,默认就是 true
  • offset 手动提交:消费的时候,手动提交offset
    • 同步:等待提交offset成功,再消费下一条
    • 异步:不等待,直接消费,失败后没有重试机制
  • 指定offset 消费:
    • earliest: 自动将偏移量重置为最早的偏移量 --from-beginning
    • latest (默认): 自动将偏离量充值为最新偏移量
    • nono: 如果未找到消费者组的先前偏移量,则向消费者抛出异常。
//设置自动提交offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
//自动提交时间 5s
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"5000");
//offset 手动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);

KafkaConsumer kafkaConsumer = new KafkaConsumer<String,String>(properties);
//定义主题
ArrayList<String> topics = new ArrayList<>();
topics.add("first");
//订阅
kafkaConsumer.subscribe(topics);
while (true){
    ConsumerRecords<String,String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
    if (CollectionUtil.isNotEmpty(consumerRecords)){
        for (ConsumerRecord<String, String> record : consumerRecords) {
            System.out.println(record);
        }
    }
    //手动提交offset
    kafkaConsumer.commitAsync();
}

指定时间消费:

 //查询对应分区
 Set<TopicPartition> partitions = kafkaConsumer.assignment();

  //保证分区分配方案定制完毕
  while (partitions.size()==0){
      kafkaConsumer.poll(Duration.ofSeconds(1));
      partitions=kafkaConsumer.assignment();
  }
  //把时间转换成对应的 offset
  Map<TopicPartition,Long> map = new HashMap<>(6);
  Map<TopicPartition,Long> offsetmap = kafkaConsumer.offsetsForTimes(map);
  for (TopicPartition topicPartition : partitions) {
      //一天前
      offsetmap.put(topicPartition,System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
  }
  Map<TopicPartition, OffsetAndTimestamp> offsetsForTimeMap = kafkaConsumer.offsetsForTimes(offsetmap);
  for (TopicPartition partition : partitions) {
      OffsetAndTimestamp timestamp = offsetsForTimeMap.get(partition);
      kafkaConsumer.seek(partition,timestamp.offset());
  }

kafka 文件存储机制 :

  • Topic 是逻辑上的概念,partition是物理上的概念, 每个partition对应一个log文件。该log文件中存储的就是 producer生产的数据
  • Producer生产的数据,会被不断追加到该log文件末端,为防止log文件过大导致数据定位效率低,kafka采取了分片和索引机制
  • 每个partition分为多个 segment,每个segment包含, .index .log .timeindex .snapshot 文件
  • 这些文件位于一个文件夹下,该文件夹命名规则为:topic名称+分区序号 first-0
    Kafka 基础整理、 Springboot 简单整合
    Kafka 基础整理、 Springboot 简单整合
  • 稀疏索引:大约每往log文件写入 4kb数据,会往index文件写入一条索引。
    • index文件中保存的 odffset是相对offset,这样能确保 offset得值所占空间不会过大,因此能将offset得值控制在固定大小

文件清除、压缩策略:

  • kafka 默认日志保存时间为 7 天
  • 压缩策略:compact,对应相同key的value,只保留最新的一个版本。

kafka 高效读写:

  • Kafka 本身是分布式集群,可以采用分区技术,并行度高
  • 读数据采用稀疏索引,可以快速定位要消费得数据
  • 顺序写磁盘,kafka得producer生产数据,要写入log文件中,写得过程是一直追加到文件末端,为顺序写
  • 零拷贝: Kaka的数据加工处理操作交由Kaka生产者和Kaka消费者处理。Kaka Broker应用层不关心存储的数据,所以就不用走应用层,传输效率高。
  • 页缓存: Kaka重度依赖底层操作系统提供的PageCache功能。当上层有写操作时,操作系统只是将数据写PageCache。当读操作发生时,先从PageCache中查找,如果找不到,再去磁盘中读取。实际上PageCache是把尽可能多的空闲内存都当做了磁盘缓存来使用。

常用脚本命名:

  • topic 相关命令
  • 查询topic列表 :sh kafka-topics.sh --bootstrap-server localhost:9092 --list
  • 创建topic (名称:first 分区:1个 副本 3个)副本数量不能超过集群数量
    • sh kafka-topics.sh --bootstrap-server localhost:9092 --topic first --create --partitions 1 --replication-factor 3
  • topic 信息
    • sh kafka-topics.sh --bootstrap-server localhost:9092 --topic first --describe
  • 修改topic 分区数(只能增加)
    • sh kafka-topics.sh --bootstrap-server localhost:9092 --topic first --describe --partitions 3
  • 生产消息:
    • sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first
  • 消费消费:
    • sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first
    • sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first --from-beginning

Spring boot 简单整合:

<dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
</dependency>
server:
  port: 8200

spring:
  mvc:
    pathmatch:
      matching-strategy: ant_path_matcher
  application:
    name: @artifactId@
  kafka:
    bootstrap-servers:
      - 192.168.1.250:32010
    # 生产配置
    producer:
      #序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      properties:
        linger.ms: 10 #sender 等待事件
         #ssl认证配置相关
#        sasl.mechanism: PLAIN
#        security.protocol: SASL_PLAINTEXT
#        sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin";
      #缓存区大小 32m
      buffer-memory: 33554432
      #批次大小 16k
      batch-size: 16
      # ISR 全部应答
      #acks: -1
      #事务ID前缀 ,配合 @Transactional ,保证多个消息的原子性
      #transaction-id-prefix: "transaction-id-xx"
    #消费配置
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #group-id: xiaoshu-1
      enable-auto-commit: false
      # 从最早消息开始消费,但是消费后,会记录offset、相同 group-id不会再次消费 
      # offset 是针对每个消费者组
      auto-offset-reset: earliest
      #批量消费,每次最多消费多少条
      #max-poll-records: 50
       #ssl认证配置相关
#      properties:
#        sasl.mechanism: PLAIN
#        security.protocol: SASL_PLAINTEXT
#        sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin";

    listener:
      # 手动调用Acknowledgment.acknowledge()后立即提交
      ack-mode: manual
      #批量消费,配合 @KafkaListener - batch="true"
      #type: batch

生产:

	@Resource
    private KafkaTemplate<String,String> kafkaTemplate;
    //@Transactional(rollbackFor = RuntimeException.class),配合 ack配置 实现多条消息发送,原子性
    @ApiOperation(value = "推送消息到kafak")
    @GetMapping("/sendMsg")
    public String sendMsg(String topic,String msg){
        kafkaTemplate.send(topic,msg).addCallback(success -> {
            if (success==null){
                System.out.println("消息发送失败");
                return;
            }
            // 消息发送到的topic
            String topicName = success.getRecordMetadata().topic();
            // 消息发送到的分区
            int partition = success.getRecordMetadata().partition();
            // 消息在分区内的offset
            long offset = success.getRecordMetadata().offset();
            System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);
        }, failure -> {
            System.out.println("发送消息失败:" + failure.getMessage());
        });
        return "ok";
    }

消费:文章来源地址https://www.toymoban.com/news/detail-434853.html

@Configuration
public class KafkaConsumer {

    private static final String TOPIC_DLT=".DLT";

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    /**
     * 每个分区由消费者组种得一个消费者消费,每个消费者独立
     * 分区 -》 消费 、2分区2个消费监听
     * @param record
     * @param consumer
     */
    @KafkaListener(groupId = "group-1", topicPartitions ={ @TopicPartition(topic = "four",partitions = {"0"})},batch = "false")
    public void consumerTopic1(ConsumerRecord<String, String> record, Consumer consumer){
        String value = record.value();
        String topic1 = record.topic();
        long offset = record.offset();
        int partition = record.partition();
        try {
            log.info("收到消息:"+value+"topic:"+topic1+"offset:"+offset+"分区"+partition);
            //TODO 异常,推送到 对应死信 ↓
            //int i=1/0;
        } catch (Exception e) {
            System.out.println("commit failed");
            kafkaTemplate.send(topic1+TOPIC_DLT,value);
        } finally {
            consumer.commitAsync();
        }
    }

    @KafkaListener(groupId = "group-1", topicPartitions ={ @TopicPartition(topic = "four",partitions = {"1"})},batch = "false")
    public void consumerTopic2(ConsumerRecord<String, String> record, Consumer consumer){
        String value = record.value();
        String topic1 = record.topic();
        long offset = record.offset();
        int partition = record.partition();
        try {
            log.info("收到消息:"+value+"topic:"+topic1+"offset:"+offset+"分区"+partition);
            //TODO 异常,推送到 对应死信 ↓
            //int i=1/0;
        } catch (Exception e) {
            System.out.println("commit failed");
            kafkaTemplate.send(topic1+TOPIC_DLT,value);
        } finally {
            consumer.commitAsync();
        }
    }

}

	/**
     * 监听 topic1 ->转发到 topic2
     */
    @KafkaListener(topics = {"topic1"},groupId = "group-4")
    @SendTo("topic2")
    public String onMessage7(ConsumerRecord<?, ?> record) {
        return record.value()+"-转发消息";
    }

    @KafkaListener(topics = {"topic2"},groupId = "group-5")
    public void onMessage8(ConsumerRecord<?, ?> record) {
        System.out.println("收到转发消息"+record.value());
    }

到了这里,关于Kafka 基础整理、 Springboot 简单整合的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Kafka 整合 SpringBoot

    1. 添加依赖 2. 配置文件 application.properties 3. 代码实现发送消息 1. 添加依赖 2. 配置文件 applicatio.properties  3. 代码实现消费消息 首先在 kafka 节点上创建 topic: 打开kafka节点服务器的终端,输入以下命令: 一些常用命令: 编写测试代码:ApplicationTests.java

    2024年02月07日
    浏览(43)
  • springboot整合kafka-笔记

    这里我的springboot版本是2.3.8.RELEASE,使用的kafka-mq的版本是2.12 测试发送kafka消息-控制台日志

    2024年02月12日
    浏览(45)
  • springboot整合kafka入门

    producer: 生产者,负责发布消息到kafka cluster(kafka集群)中。生产者可以是web前端产生的page view,或者是服务器日志,系统CPU、memory等。 consumer: 消费者,每个consumer属于一个特定的consuer group(可为每个consumer指定group name,若不指定group name则属于默认的group)。创建消费者时,

    2024年02月07日
    浏览(46)
  • 【SpringBoot系列】SpringBoot整合Kafka(含源码)

    前言 在现代的微服务架构中,消息队列已经成为了一个不可或缺的组件。 它能够帮助我们在不同的服务之间传递消息,并且能够确保这些消息不会丢失。 在众多的消息队列中,Kafka 是一个非常出色的选择。 它能够处理大量的实时数据,并且提供了强大的持久化能力。 在本

    2024年02月05日
    浏览(43)
  • 什么是kafka,如何学习kafka,整合SpringBoot

    目录 一、什么是Kafka,如何学习 二、如何整合SpringBoot 三、Kafka的优势   Kafka是一种分布式的消息队列系统,它可以用于处理大量实时数据流 。学习Kafka需要掌握如何安装、配置和运行Kafka集群,以及如何使用Kafka API编写生产者和消费者代码来读写数据。此外,还需要了解Ka

    2024年02月10日
    浏览(39)
  • Kafka入门(安装和SpringBoot整合)

    app-tier:网络名称 –driver:网络类型为bridge Kafka依赖zookeeper所以先安装zookeeper -p:设置映射端口(默认2181) -d:后台启动 安装并运行Kafka, –name:容器名称 -p:设置映射端口(默认9092 ) -d:后台启动 ALLOW_PLAINTEXT_LISTENER任何人可以访问 KAFKA_CFG_ZOOKEEPER_CONNECT链接的zookeeper K

    2024年02月11日
    浏览(81)
  • 实战:彻底搞定 SpringBoot 整合 Kafka

    kafka是一个消息队列产品,基于Topic partitions的设计,能达到非常高的消息发送处理性能。Spring创建了一个项目Spring-kafka,封装了Apache 的Kafka-client,用于在Spring项目里快速集成kafka。 除了简单的收发消息外,Spring-kafka还提供了很多高级功能,下面我们就来一一探秘这些用法。

    2024年02月10日
    浏览(44)
  • springboot整合kafka多数据源

    在很多与第三方公司对接的时候,或者处在不同的网络环境下,比如在互联网和政务外网的分布部署服务的时候,我们需要对接多台kafka来达到我们的业务需求,那么当kafka存在多数据源的情况,就与单机的情况有所不同。 单机的情况 如果是单机的kafka我们直接通过springboot自

    2024年02月13日
    浏览(49)
  • springboot整合ELK+kafka采集日志

    在分布式的项目中,各功能模块产生的日志比较分散,同时为满足性能要求,同一个微服务会集群化部署,当某一次业务报错后,如果不能确定产生的节点,那么只能逐个节点去查看日志文件;logback中RollingFileAppender,ConsoleAppender这类同步化记录器也降低系统性能,综上一些

    2024年02月15日
    浏览(42)
  • SpringBoot-Learning系列之Kafka整合

    SpringBoot-Learning系列之Kafka整合 本系列是一个独立的SpringBoot学习系列,本着 What Why How 的思想去整合Java开发领域各种组件。 消息系统 主要应用场景 流量消峰(秒杀 抢购)、应用解耦(核心业务与非核心业务之间的解耦) 异步处理、顺序处理 实时数据传输管道 异构语言架构系

    2024年02月09日
    浏览(70)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包