Kafka及Kafka消费者的消费问题及线程问题

这篇具有很好参考价值的文章主要介绍了Kafka及Kafka消费者的消费问题及线程问题。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、Kafka中的基本信息

kafka中topic、broker、partition、及customer和producer等的对应关系

  1. Topic:是 Kafka 消息发布和订阅的基本单元,同时也是消息的容器。Topic 中的消息被分割成多个分区进行存储和处理。

  2. Partition:是 Topic 分区,将 Topic 细分成多个分区,每个分区可以独立地存储在不同的 Broker
    中,从而增加了消息的并发性、可扩展性和吞吐量。

  3. Broker:是 Kafka 集群中的一个或多个服务器实例,每个 Broker 可以存在于一个或多个 Topic
    的发布或订阅者列表中,并且可以跨集群多个 Brokers 进行消息路由和数据副本工作。

  4. Producer:是指向 Kafka Topic 发送消息的客户端应用程序。Producer 通过向 Broker 发送消息来向 Kafka Topic 发布数据。

  5. Consumer:是从 Kafka Topic 订阅消息的客户端应用程序。Consumer 通过消费者群组的方式订阅一个 Topic,并从 Broker 中读取消息进行消费。

  6. Consumer Group:是指一个或多个 Consumer 实例的集合,共同订阅同一个 Topic
    并共同消费其中的消息。Consumer Group 可以增加消费的吞吐量,并且可以实现负载均衡和故障自动转移。

  7. Offset:是指在 Partition 中每个消息的唯一标识符。每条消息都有一个 offset,Kafka 使用这个 offset 来保证顺序和消息传递的可靠性。

  8. Replication:是指在 Kafka 副本模式下所有数据被复制到多个 Broker,以提高数据的冗余性和可用性。

  9. Broker 集群:是指由多个 Broker 组成的 Kafka 集群,用于提供高可用性和可扩展性的数据流处理和分发服务。Kafka 集群中的每个 Broker 都是具有一定功能的独立工作单元。

Window安装配置kafka和zookeeper并将其加入服务的方式

  1. 在kafka目录下的bin/windows文件下,在当前位置打开cmd,使用命令行启动zookeeper和kafka。
    先启动zookeeper,在启动kafka

.\zookeeper-start.bat .\config\zookeeper.properties
.\kafka-server-start.bat .\config\server.properties

  1. 使用nssm安装到服务后启动
    进入nssm官网下载nssm.exe,地址见elk安装。
    注意事项:配置文件properties,写在parameter处。

二、 Kafka消费者的消费问题及线程问题

kafka中不同topic使用同一个Group Id会出现的问题分析

在 Kafka 中,消费者组(consumer group)是一组逻辑上相同的消费者,它们共同消费订阅的 topic 中的消息。如果不同 topic 使用同一个 Group ID,那么它们将视为*同一组消费者,相当于每个消费者实例只能消费组中的一个主题,而其他主题的分区将不会被消费。同时,如果消息被放置在某个主题的分区中,在该主题下任何消费者组中的消费者都有可能消费到该消息,这可能导致数据重复消费或者数据消费不均,会降低整个消费群组的消费效率。因此,在实际应用中,应该为每个 topic 创建一个独立的 Group ID,以确保消费者能够正确地消费对应 topic 下的所有消息。

高效的消费一个Topic

  1. 批量消费:通过调整消费拉取的大小,可以将单次拉取到的消息量增加到一个较大的数量,从而减少通信开销和网络带宽使用,提高吞吐量。
  2. 并行化消费:在拥有多个消费者实例的同一消费者组中,可以将每个消费者实例分配到不同的分区上,以实现并行消费。分区分配可以使用 Kafka 的自动分区分配功能。
  3. 增量消费:消费者可以记录消费的 offset,下次消费从上一次消费到的 offset 开始,避免重复消费。
  4. 合理的配置和优化:优化 Kafka 集群的配置,如调整未消费消息的保留时间,通过优化消费者端的参数设置,让消费者能够及时地拉取消息并处理。
  5. 使用高级 API:Kafka 提供了 Consumer API 和 Streams API,可以适应不同的消费场景。 Consumer API 提供了一种较低级别的消费方式,这种方式需要消费者自己管理 offset、分区、并发等。 Streams API 则提供了一种更高级别的消费方式,这种方式封装了很多细节,比如处理流式数据的能力、失败恢复以及自我管理等。使用 Streams API 可以显著地加快消费速度。

通过对批量消费、并行化消费、增量消费、优化配置和使用高级 API 等方面的优化,可以高效地消费 Kafka 中的一个 topic。

kafka避免重复消费的思路

在 Kafka 中,为了避免重复消费消息,每个消费者会维护一个 Offset 的值,用来记录自己已经消费了哪些消息。Kafka 提供了三种 Offset 维护的方式,分别是:

  1. 手动维护 Offset:在这种方式下,消费者需要手动记录 offset 的值,并在消费结束后保存这个 offset,用于下一次消费的起点。
  2. 自动提交 Offset:在这种方式下,Kafka 会自动定时提交 offset 的值,但由于自动提交是异步的,因此如果在消息消费期间消费者异常退出或者重启,就会导致 offset 丢失或重复提交。
  3. 手动+自动维护 Offset:在这种方式下,除了内部维护的消费 offset 之外,还需要手动维护一个“checkpoint”,当消息消费成功后,将消费的 offset 保存在“checkpoint”中,以便于在消费者重启或异常退出后重新消费消息。

kafka中Offset不更新的情况和处理思路

  1. 消费者处理消息的时间太长,导致在下一次消费开始时,Offset 还没有被更新。
  2. 消费者的重启或者异常退出导致 Offset 没有提交或者提交不完整。

kafka如何避免重复消费,offset不更新的情况

  1. 缩短消息处理的时间,以便在下一次消费开始时,Offset 已经更新。
  2. 在消费者退出时,将 Offset 手动提交。
  3. 使用事务来实现消息的消费和 Offset 提交,并始终在一起提交,这样即使在消费者异常退出时,就可以通过事务回滚来避免重复消费。

三、消费时的数据库与线程池

springboot中所有涉及到数据库操作的程序使用同一个线程池,如何避免数据库Too Many Connections错误

使用同一线程池会带来一个问题,就是当线程池内的线程数量过多时,可能会导致数据库因为连接数过多而出现 Too Many Connections 错误。为了避免这个问题,可以采用以下措施:

  1. 将线程池的最大连接数设置为合理的值。通过合理设置最大连接数,保证线程池内线程数量的合理性,并且避免连接过多导致数据库出现问题。
  2. 使用数据库连接池来管理数据库连接。比如,Spring Boot 中默认使用的连接池是 HikariCP 连接池,它能够自动的管理连接的数量,预防连接池出现 Too Many Connections 错误。
  3. 确保每个程序正确关闭数据库连接。无论是使用连接池还是手动创建连接,都必须正确的关闭连接,以便释放资源和避免连接池出现错误。

因为线程问题,数据库遇“1040:Too Many Connections” 错误解决方案

  1. 增加数据库连接数限制:可以在 MySQL 数据库中设置最大连接数限制,修改 My.cnf 文件,在 [mysqld] 下面加上如下参数:max_connections = 500
  2. 检查访问数据库的代码:检查在程序中是否有未关闭连接的情况,如果有,需要手动关闭连接来释放资源。
  3. 减少线程池数量:尝试减少线程池中线程数的数量,可以通过将线程池的 coreSize 或者 maximumPoolSize 参数设置为较小的值,以限制线程的数量。
  4. 使用连接池:可以使用连接池来实现对数据库连接的管理,比如使用 HikariCP、Druid、C3P 等开源的连接池工具,可以自动地管理连接的数量,避免过多的连接数。
  5. 使用分布式数据库:如果仍然存在高并发场景需要访问数据库,并且连接数过多的情况,可以考虑使用分布式数据库,将数据库中的数据进行分片存储,每个分片都使用单独的数据库实例,从而降低单个数据库的连接数量,提高数据库的可扩展性和性能。

通过检查访问数据库的代码、增加数据库连接数限制、使用连接池等多种方式来优化和改进数据库连接的管理。

kafka基础学习

一、Wget的用法

wget -O "rename" "url" //下载文件并重命名
wget "url"			//直接下载文件
wget -c "url"		//继续下载因为网络中断停止下载的文件
wget -b "url"		//后台下载文件
wget –spider "url"	//测试该网址的下载速度

一、消息队列的两种模式

1.点对点模式
消费者主动拉取数据,消息收到后消除。
2.发布/订阅模式
可以有多个topic主题(浏览、点赞、收藏、评论等)
消费者消费数据之后,不删除数据
每个消费者相互独立,都可以消费到数据
有定期清理规则,与消费者无关

二、kafka基础架构(broker,partition和topic)

1.为方便扩展,并提高吞吐量,一个topic分为多个partition
2.配合分区的设计,提出消费者组的概念,组内每个消费者并行消费
3.为提高可用性,为每个partition增加若干副本,类似NameNode HA
4.ZK中记录谁是leader,Kafka2.8.0以后也可以配置不采用ZK

三、zookeeper与Kafka的关系

必须在kafka全部关闭后才能关闭zookeeper,否则只能kill -9

四、注意事项

1.消费者只会消费建立连接后的数据
2.分区数只能增加,不能减少
3.不能通过命令行修改分区副本

五、工具

1.服务器配置文件分发工具
xsync
2.java进程查看工具
jps

六、具体指令

1.查看所有topic列表

	kafka-topics.bat --bootstrap-server localhost:9092  --list 

2.查看topic具体信息

	kafka-topics.bat --bootstrap-server localhost:9092  --topic test --describe  

3.生产者发送数据

	kafka-console-producer.bat --bootstrap-server localhost:9092 --topic test	

4.消费者接收数据

	kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test  

5.消费者接收历史数据

	kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test  --from-brginning 

七、思考

为什么kafka不使用java自带的serializer

八、生产者的组成

main线程
拦截器
序列化器
分区器-双端队列,内存池(内存的创建和销毁)

九、生产者DQuene的sender

数据积累,当数据量达到batch.size,才发送数据,batch.size默认大小是16K
时间积累,当数据量未达到batch。size的大小,sender等待linger.ms设置的大小,单位为ms,默认0ms,表示无延迟

十、分区选择

1.自己指定分区,填入指定的分区数
2.没有指明partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到partition值;
3.既没有partition值又没有key值的情况下,Kafka采用Sticky Partition (黏性分区器),会随机选择一个分区,并尽可一直使用该分区,待该分区的batch已满或者已完成(即),Kafka再随机一个分区进行使用(和上一次的分区不同)。

十一、提升生产者的吞吐量

1.增大缓冲区
2.增加linger.ms
3.数据压缩snappy,zip等

十二、生产者的消息发送是否会丢失,即生产者的数据可靠性,也可能出现数据重复

通过对ACK应答级别进行设置,保证数据的可靠性
ACK的应答级别:-1,0,1
-1:生产者发送的数据,不需要等数据落盘应答
0:生产者发送的消息,Leader收到数据后应答
1:生产者发送过来的数据,leader和ISR队列里面所有节点收齐数据后应答
数据的可靠性保证: ACK级别为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2
应答级别的可靠性排序
-1>1>0

数据重复出现的情况
在同步完成,但是未应答时leader挂了,重复选举后又对数据进行同步
最少发送一次即ACK级别为-1+分区副本大于等于2+ISR里应答的最小副本数量大于等于2
最多一次即ACK级别为0

十四、幂等性

但是,要保证数据只发送一次,不多不少,即幂等性
Kafka生产者的幂等性是指,当生产者尝试重新发送之前已经发送过的消息时,Kafka可以保证这些消息只会被写入一次,而不会被写入多次。这种机制的实现可以避免以下场景:
	网络中断,导致消息重发。
	应答超时或者失败,导致消息重发。
	重复发送同一消息,导致数据重复。
Kafka实现幂等性的方式是通过给每一条消息分配一个固定的序列号,这个序列号会被放入消息的header中进行传输。
当该消息被服务端接收并成功写入到Kafka分区中时,该序列号会被标记为已处理,以后再次发送相同序列号的消息时,生产者会在header中携带该消息曾经分配到的序列号,服务端就会根据序列号判断该消息是否已经被处理过,如果该序列号已经被标记为已经处理,则服务端会忽略该消息,从而避免数据重复写入。
生产者幂等性的实现对于数据一致性极为重要的场景非常有用,它可以避免生产者重复发送数据等问题,从而提高了系统的稳定性和可用性。

十五、kafka生产者的事务

Kafka事务是指多个Kafka消息的原子性批量提交,即所有消息要么全部提交成功,要么全部回滚。Kafka 0.11版本开始提供事务及ACID语义支持,Kafka 0.11版本之前没有提供对事务的正式支持。

Kafka的事务机制主要基于消息生产者API,通过引入两个新API:beginTransaction和commitTransaction来实现事务支持。startTransaction 用于在生产者端启动事务,beginTransaction则代表由应用程序开始了一批消息的处理。在所有消息处理完毕,确认消息发送成功之后,就可以调用 commitTransaction 方法完成事务。在调用 comitTransaction 方法时, Kafka会将所有成功发送的消息一起提交,如果有任何一个消息发送失败,则整个事务都会失败。
在事务提交之前,消息生产者将所有消息缓存在一个暂存池中,这个暂存池就是所谓的批次(batch),只有当消息发送成功并被确认后,批次中的消息才被视为全部发送成功。事务的过程中,消息的发送被视为不可见。只有当事务提交成功后才会对其他消费者可见。

Kafka 事务处理非常适合需要确保消息传输的可靠性,比如在分布式事务的场景下,可以保证分布式业务操作的可靠性。Kafka 事务的机制可以让我们以数据库事务的方式保证消息的可靠传输。

十六、数据库事务和kafka事务的区别

1.范围不同:Kafka的事务处理的对象是消息数据,而数据库的事务处理的对象是数据表中的数据。

2.原子性实现方式不同:Kafka的事务实现的原子性是通过消息的批量发送机制实现的,而数据库的事务是通过原子性提交或回滚操作来实现的。

3.并发性实现方式不同:Kafka的事务是基于分布式协调方式实现的,而数据库的事务是基于锁定机制实现的。

4.目的不同:Kafka的事务主要用于保证消息传输的可靠性,而数据库的事务主要用于保证数据操作的完整性和一致性。

5.调用方式不同:Kafka的事务是由生产者组织事务,通过生产者API的方式实现,而数据库的事务是由应用程序组织事务,通过数据库事务操作语句进行实现。
Kafka的事务和数据库的事务有很多相似之处,都是用来保证数据操作或传输的可靠性和一致性。但是,由于它们的应用场景和实现方式的不同,它们在具体实现上也有很大的差异。

十七、数据乱序

在单个分区中,数据是有序的。但是多个分区中数据的传输是无序的,消费者自行排序

十八、单分区有序的设置

在kafka 1.x前保证数据单分区有序,无需考虑幂等性
max.in.flight.requests.per.connection = 1
在kafka 1.x后保证数单分区有序的设置得考虑幂等性,最近的几个元数据都是有序的。
幂等性未开启:max.in.flight.requests.per.connection = 1
幂等性开启:max.in.flight.requests.per.connection = 5,必须设置小于等于5.

十九、Kafka中AR、OSR、ISR的区别

在Kafka中,AR、OSR和ISR都是与副本相关的术语。其中,AR是Assigned Replicas的缩写,指的是每个partition下所有副本(replicas)的统称;ISR是In-Sync Replicas的缩写,是指副本同步队列,ISR是AR中的一个子集;OSR是Out-of-Sync Replicas的缩写,是指与leader副本保持一定程度同步但未加入同步队列的副本,不包括leader副本。

AR=ISR+OSR。正常情况下,所有的follower副本都应该与leader副本保持同步,即AR=ISR,OSR集合为空。

二十、Kafka中的Follow故障

LEO是Last End Offset的缩写,指的是当前replica存的当前最大offset的下一个值;HW是High Watermark的缩写,它标识了一个特定的消息偏移量(offset),消费者只能拉取到这个offset之前的消息。

在Kafka中,HW是分区ISR集合中的每个副本都会维护自身的LEO,而ISR集合中最小的LEO即为分区的HW。所以,HW、LW是分区层面的概念;而LEO、LogStartOffset是日志层面的概念;LSO是事务层面的概念。文章来源地址https://www.toymoban.com/news/detail-627145.html

到了这里,关于Kafka及Kafka消费者的消费问题及线程问题的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • kafka实战-消费者offset重置问题

    背景:当app启动时,会调用 “启动上报接口” 上报启动数据,该数据包含且不限于手机型号、应用版本、app类型、启动时间等,一站式接入平台系统会记录该数据。 生产者:“启动上报接口”会根据启动数据发送一条kafka消息,topic“xxx” 消费者:“启动处理模块”会监控

    2023年04月11日
    浏览(75)
  • kafka消费者详解,根据实际生产解决问题

    1.首先kafka每创建一个消费者就是一个消费者组,必须指定groupip 2.两个消费者组之间不相互影响,消费同一个主题的同一个分区,两个消费者组不相互影响,各自记录自己的offset 3.在开发中如果没有指定每个消费者去消费特定的分区,那么kafka默认是按照roundRobin轮询的方式分

    2024年02月10日
    浏览(35)
  • kafka Consumer 消费者使用多线程并发执行,并保证顺序消费, 第一种使用纯线程方式、第二种使用Executors线程池

    网上搜索kafka消费者通过多线程进行顺序消费的内容都不太理想,或者太过复杂,所以自己写了几个demo,供大家参考指正。         单个消费者,每秒需要处理1000条数据,每条数据的处理时间为500ms,相同accNum(客户账号)的数据需要保证消费的顺序。 1、如果1秒钟生产

    2024年02月15日
    浏览(31)
  • 【项目实战】Java 开发 Kafka 消费者

    👉 博主介绍 : 博主从事应用安全和大数据领域,有8年研发经验,5年面试官经验,Java技术专家,WEB架构师,阿里云专家博主,华为云云享专家,51CTO TOP红人 Java知识图谱点击链接: 体系化学习Java(Java面试专题) 💕💕 感兴趣的同学可以收藏关注下 , 不然下次找不到哟

    2024年02月16日
    浏览(30)
  • Kafka消费者异常问题解析与解决方案

    Kafka是一个分布式流处理平台,它提供了高吞吐量、容错性和可扩展性的特性。然而,有时候在使用Kafka消费者时,可能会遇到一些异常情况。本文将详细讨论几种常见的Kafka消费异常问题,并提供相应的解决方案。 问题1:消费者无法连接到Kafka集群 当消费者无法连接到Kafk

    2024年02月05日
    浏览(36)
  • 【Kafka每日一问】Kafka消费者故障,出现活锁问题如何解决?

    在Kafka中,消费者的“活锁”通常是指消费者实例持续失败并重新加入消费者组,但却始终无法成功处理消息。这种现象可能会导致消费者组不断触发重平衡(rebalance),而消息却没有被实际消费。以下是一些解决或缓解活锁问题的策略: 1. 优化消息处理逻辑 消费者可能由于

    2024年01月20日
    浏览(33)
  • 【工作中问题解决实践 十一】Kafka消费者消费堆积且频繁rebalance

    最近有点不走运,老是遇到基础服务的问题,还是记着点儿解决方法,以后再遇到快速解决吧,今天遇到这个问题倒不算紧急,但也能通过这个问题熟悉一下Kafka的配置。 正在开会的时候突然收到一连串的报警,赶忙看看是为啥 没过一会儿基础服务报警也来了 Kafka 自身的异

    2024年02月13日
    浏览(35)
  • Spring Boot中使用Kafka时遇到“构建Kafka消费者失败“的问题

    在使用Spring Boot开发应用程序时,集成Apache Kafka作为消息队列是一种常见的做法。然而,有时候在配置和使用Kafka时可能会遇到一些问题。本文将探讨在Spring Boot应用程序中使用Kafka时可能遇到的\\\"构建Kafka消费者失败\\\"错误,并提供解决方案。 错误描述: 当尝试构建Kafka消费者时

    2024年01月17日
    浏览(36)
  • Java轻松使用Kafka生产者,消费者

    Java轻松使用Kafka生产者,消费者 一、环境说明 项目中需要下面的依赖: ( 版本自定义 ) 2. yml配置文件设置 1. 简单生产者的书写: 1. 简单消费者的书写:   注:多消费者时,需要对应kafka中配置的分区;多少的Partition就有多少个消费者,以免资源浪费

    2024年02月15日
    浏览(42)
  • Java实现Kafka消费者及消息异步回调方式

    Kafka 在创建消费者进行消费数据时,由于可以理解成为是一个kafka 的单独线程,所以在Kafka消费数据时想要在外部对消费到的数据进行业务处理时是获取不到的,所以就需要实现一个消息回调的接口来进行数据的保存及使用。 消息回调接口实现代码如下 Kafka消费者代码实现如

    2024年02月06日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包