【大数据】Kafka 入门简介

这篇具有很好参考价值的文章主要介绍了【大数据】Kafka 入门简介。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1.什么是 Kafka

Kafka 是一个分布式流处理系统,流处理系统使它可以像消息队列一样 publish 或者 subscribe 消息,分布式提供了容错性,并发处理消息的机制。

2.Kafka 的基本概念

Kafka 运行在集群上,集群包含一个或多个服务器。Kafka 把消息存在 Topic 中,每一条消息包含键值(Key),值(Value)和时间戳(Timestamp)。

Kafka 有以下一些基本概念:

  • Producer:消息生产者,就是向 Kafka Broker 发消息的客户端。
  • Consumer:消息消费者,是消息的使用方,负责消费 Kafka 服务器上的消息。
  • Topic:主题,由用户定义并配置在 Kafka 服务器,用于建立 Producer 和 Consumer 之间的订阅关系。生产者发送消息到指定的 Topic 下,消息者从这个 Topic 下消费消息。
  • Partition:消息分区,一个 Topic 可以分为多个 Partition,每个 Partition 是一个有序的队列。Partition 中的每条消息都会被分配一个有序的 id(Offset)。
  • Broker:一台 Kafka 服务器就是一个 Broker。一个集群由多个 Broker 组成。一个 Broker 可以容纳多个 Topic。
  • Consumer Group:消费者分组,用于归组同类消费者。每个 Consumer 属于一个特定的 Consumer Group,多个消费者可以共同消费一个Topic下的消息,每个消费者消费其中的部分消息,这些消费者就组成了一个分组,拥有同一个分组名称,通常也被称为消费者集群。
  • Offset:消息在 Partition 中的偏移量。每一条消息在 Partition 都有唯一的偏移量,消费者可以指定偏移量来指定要消费的消息。

3.Kafka 分布式架构

【大数据】Kafka 入门简介,# Kafka,大数据,kafka,分布式,消息队列,数据同步
如上图所示,Kafka 将 Topic 中的消息存在不同的 Partition中。如果存在键值(Key),消息按照键值做分类存在不同的 Partition 中,如果不存在键值,消息按照轮询(Round Robin)机制存在不同的 Partition 中。默认情况下,键值决定了一条消息会被存在哪个 Partition 中。

【大数据】Kafka 入门简介,# Kafka,大数据,kafka,分布式,消息队列,数据同步
Partition 中的消息序列是有序的消息序列。Kafka 在 Partition 使用偏移量(Offset)来指定消息的位置。一个 Topic 的一个 Partition 只能被一个 Consumer Group 中的一个 Consumer 消费,同组的多个 Consumer 消费同一个 Partition 中的数据是不允许的;但是一个 Consumer 可以消费多个 Partition 中的数据。

Kafka 将 Partition 的数据复制到不同的 Broker,提供了 Partition 数据的备份。每一个 Partition 都有一个 Broker 作为 Leader,若干个 Broker 作为 Follower。所有的数据读写都通过 Leader 所在的服务器进行,并且 Leader 在不同 Broker 之间复制数据。

【大数据】Kafka 入门简介,# Kafka,大数据,kafka,分布式,消息队列,数据同步
上图中,对于 Partition 0,Broker 1 是它的 Leader,Broker 2 和 Broker 3 是 Follower。对于 Partition 1,Broker 2 是它的 Leader,Broker 1 和 Broker 3 是 Follower。

【大数据】Kafka 入门简介,# Kafka,大数据,kafka,分布式,消息队列,数据同步
在上图中,当有 Client(也就是 Producer)要写入数据到 Partition 0 时,会写入到 Leader Broker 1,Broker 1 再将数据复制到 Follower Broker 2 和 Broker 3。

【大数据】Kafka 入门简介,# Kafka,大数据,kafka,分布式,消息队列,数据同步
在上图中,Client 向 Partition 1 中写入数据时,会写入到 Broker 2,因为 Broker 2 是 Partition 1 的 Leader,然后 Broker 2 再将数据复制到 Follower Broker 1 和 Broker 3 中。

上图中的 Topic 一共有 3 个 Partition,对每个 Partition 的读写都由不同的 Broker 处理,因此总的吞吐量得到了提升。

4.配置单机版 Kafka

这里我们使用 Kafka 0.10.0.0 0.10.0.0 0.10.0.0 版本。

4.1 下载并解压包

$ wget https://archive.apache.org/dist/kafka/0.10.0.0/kafka_2.11-0.10.0.0.tgz
$ tar -xzf kafka_2.11-0.10.0.0.tgz
$ cd kafka_2.11-0.10.0.0

4.2 启动 Kafka

Kafka 需要用到 Zookeeper,所以需要先启动 Zookeeper。我们这里使用下载包里自带的单机版 Zookeeper。

$ bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...

然后启动 Kafka

$ bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...

4.3 创建 Topic

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

查看创建的 Topic

$ bin/kafka-topics.sh --list --zookeeper localhost:2181
test

4.4 向 Topic 中发送消息

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message

4.5 从 Topic 中消费消息

$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
This is a message
This is another message

5.实验

5.1 实验一:Python 实现生产者消费者

kafka-python 是一个 Python 的 Kafka 客户端,可以用来向 Kafka 的 Topic 发送消息、消费消息。

这个实验会实现一个 Producer 和一个 Consumer。roducer 向 Kafka 发送消息,Consumer 从 Topic 中消费消息。结构如下图:
【大数据】Kafka 入门简介,# Kafka,大数据,kafka,分布式,消息队列,数据同步

# producer.py
import time
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers="localhost:9092")
i = 0
while True:
    ts = int(time.time() * 1000)
    producer.send(topic="test", value=str(i), key=str(i), timestamp_ms=ts)
    producer.flush()
    print i
    i += 1
    time.sleep(1)
# consumer.py
from kafka import KafkaConsumer

consumer = KafkaConsumer("test", bootstrap_servers=["localhost:9092"])
for message in consumer:
    print message

接下来创建一个 Topic,名为 test

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test".

打开两个窗口中,我们在 window-1 中运行 producer.py,如下:

# window-1
$ python producer.py
0
1
2
3
4
5
...

window-2 中运行 consumer.py,如下:

# window-2
$ python consumer.py
ConsumerRecord(topic=u'test', partition=0, offset=128, timestamp=1512554839806, timestamp_type=0, key='128', value='128', checksum=-1439508774, serialized_key_size=3, serialized_value_size=3)
ConsumerRecord(topic=u'test', partition=0, offset=129, timestamp=1512554840827, timestamp_type=0, key='129', value='129', checksum=1515993224, serialized_key_size=3, serialized_value_size=3)
ConsumerRecord(topic=u'test', partition=0, offset=130, timestamp=1512554841834, timestamp_type=0, key='130', value='130', checksum=453490213, serialized_key_size=3, serialized_value_size=3)
ConsumerRecord(topic=u'test', partition=0, offset=131, timestamp=1512554842841, timestamp_type=0, key='131', value='131', checksum=-632119731, serialized_key_size=3, serialized_value_size=3)
...

可以看到 window-2 中的 Consumer 成功的读到了 Producer 写入的数据。

5.2 实验二:消费组实现容错性机制

这个实验将展示消费组的容错性的特点。这个实验中将创建一个有 2 个 Partition 的 Topic,和 2 个 Consumer,这 2 个 Consumer 共同消费同一个 Topic 中的数据。结构如下所示:
【大数据】Kafka 入门简介,# Kafka,大数据,kafka,分布式,消息队列,数据同步
Producer 部分代码和实验一相同,这里不再重复。Consumer 需要指定所属的 Consumer Group,代码如下:

# consumer.py
from kafka import KafkaConsumer

consumer = KafkaConsumer("test", bootstrap_servers=["localhost:9092"], group_id="testgroup")
for message in consumer:
    print message

接下来我们创建一个 Topic,名为 Test,设置 Partition 数量为 2。

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test
Created topic "test".

打开三个窗口,一个窗口运行 Producer,还有两个窗口运行 Consumer。

运行 Consumer 的两个窗口的输出如下:

# window-1
$ python consumer.py
ConsumerRecord(topic=u'test', partition=0, offset=11, timestamp=1512556619298, timestamp_type=0, key='15', value='15', checksum=-1492440752, serialized_key_size=2, serialized_value_size=2)
ConsumerRecord(topic=u'test', partition=0, offset=12, timestamp=1512556621308, timestamp_type=0, key='17', value='17', checksum=-1029407634, serialized_key_size=2, serialized_value_size=2)
ConsumerRecord(topic=u'test', partition=0, offset=13, timestamp=1512556622316, timestamp_type=0, key='18', value='18', checksum=1544755853, serialized_key_size=2, serialized_value_size=2)
ConsumerRecord(topic=u'test', partition=0, offset=14, timestamp=1512556624326, timestamp_type=0, key='20', value='20', checksum=2130557725, serialized_key_size=2, serialized_value_size=2)
...


# window-2
$ python consumer.py
ConsumerRecord(topic=u'test', partition=1, offset=6, timestamp=1512556617287, timestamp_type=0, key='13', value='13', checksum=-1494513008, serialized_key_size=2, serialized_value_size=2)
ConsumerRecord(topic=u'test', partition=1, offset=7, timestamp=1512556618293, timestamp_type=0, key='14', value='14', checksum=-1499251221, serialized_key_size=2, serialized_value_size=2)
ConsumerRecord(topic=u'test', partition=1, offset=8, timestamp=1512556620303, timestamp_type=0, key='16', value='16', checksum=-783427375, serialized_key_size=2, serialized_value_size=2)
ConsumerRecord(topic=u'test', partition=1, offset=9, timestamp=1512556623321, timestamp_type=0, key='19', value='19', checksum=-1902514040, serialized_key_size=2, serialized_value_size=2)
ConsumerRecord(topic=u'test', partition=1, offset=10, timestamp=1512556626337, timestamp_type=0, key='22', value='22', checksum=782849423, serialized_key_size=2, serialized_value_size=2)
...

可以看到两个 Consumer 同时运行的情况下,它们分别消费不同 Partition 中的数据。window-1 中的 Consumer 消费 Partition 0 中的数据,window-2 中的 Consumer 消费 Parition 1 中的数据。

我们尝试关闭 window-1 中的 Consumer,可以看到如下结果:

# window-2

ConsumerRecord(topic=u'test', partition=1, offset=105, timestamp=1512557514410,timestamp_type=0, key='46', value='46', checksum=-1821060627, serialized_key_size=2, serialized_value_size=2)
ConsumerRecord(topic=u'test', partition=1, offset=106, timestamp=1512557518428,timestamp_type=0, key='50', value='50', checksum=281004575, serialized_key_size=2, serialized_value_size=2)
ConsumerRecord(topic=u'test', partition=1, offset=107, timestamp=1512557521442,timestamp_type=0, key='53', value='53', checksum=1245067939, serialized_key_size=2, serialized_value_size=2)
ConsumerRecord(topic=u'test', partition=1, offset=108, timestamp=1512557525461,timestamp_type=0, key='57', value='57', checksum=-1003840299, serialized_key_size=2, serialized_value_size=2)
ConsumerRecord(topic=u'test', partition=0, offset=98, timestamp=1512557494325, timestamp_type=0, key='26', value='26', checksum=-1576244323, serialized_key_size=2, serialized_value_size=2)
ConsumerRecord(topic=u'test', partition=0, offset=99, timestamp=1512557495329, timestamp_type=0, key='27', value='27', checksum=510530536, serialized_key_size=2, serialized_value_size=2)
ConsumerRecord(topic=u'test', partition=0, offset=100, timestamp=1512557502360,timestamp_type=0, key='34', value='34', checksum=1781705793, serialized_key_size=2, serialized_value_size=2)
ConsumerRecord(topic=u'test', partition=0, offset=101, timestamp=1512557504368,timestamp_type=0, key='36', value='36', checksum=2142677730, serialized_key_size=2, serialized_value_size=2)
ConsumerRecord(topic=u'test', partition=0, offset=102, timestamp=1512557505372,timestamp_type=0, key='37', value='37', checksum=-1376259357, serialized_key_size=2, serialized_value_size=2)
...

刚开始 window-2 中的 Consumer 只消费 Partition 1 中的数据,当 window-1 中的 Consumer 退出后,window-2 中的 Consumer 中也开始消费 Partition 0 中的数据了。

5.3 实验三:Offset 管理

Kafka 允许 Consumer 将当前消费的消息的 Offset 提交到 Kafka中,这样如果 Consumer 因异常退出后,下次启动仍然可以从上次记录的 Offset 开始向后继续消费消息。

这个实验的结构和实验一的结构是一样的,使用一个 Producer,一个 Consumer,主题 test 的 Partition 数量设为 1。

Producer 的代码和实验一中的一样,这里不再重复。Consumer 的代码稍作修改,这里 Consumer 中打印出下一个要被消费的消息的 Offset。Consumer 代码如下:

from kafka import KafkaConsumer, TopicPartition

tp = TopicPartition("test", 0)
consumer = KafkaConsumer(bootstrap_servers=["localhost:9092"], group_id="testgroup", auto_offset_reset="earliest", enable_auto_commit=False)
consumer.assign([tp])
print "start offset is", consumer.position(tp)
for message in consumer:
    print message

在一个窗口中启动 Producer,在另一个窗口并且启动 Consumer。Consumer 的输出如下:

$ python consumer.py
start offset is 98
ConsumerRecord(topic=u'test', partition=0, offset=98, timestamp=1512558902904, timestamp_type=0, key='98', value='98', checksum=-588818519, serialized_key_size=2, serialized_value_size=2)
ConsumerRecord(topic=u'test', partition=0, offset=99, timestamp=1512558903909, timestamp_type=0, key='99', value='99', checksum=1042712647, serialized_key_size=2, serialized_value_size=2)
ConsumerRecord(topic=u'test', partition=0, offset=100, timestamp=1512558904915, timestamp_type=0, key='100', value='100', checksum=-838622723, serialized_key_size=3, serialized_value_size=3)
ConsumerRecord(topic=u'test', partition=0, offset=101, timestamp=1512558905920, timestamp_type=0, key='101', value='101', checksum=-2020362485, serialized_key_size=3, serialized_value_size=3)
ConsumerRecord(topic=u'test', partition=0, offset=102, timestamp=1512558906926, timestamp_type=0, key='102', value='102', checksum=-345378749, serialized_key_size=3, serialized_value_size=3)
...

可以尝试退出 Consumer,再启动 Consumer。每一次重新启动,Consumer 都是从 offset=98 的消息开始消费的。

修改 Consumer 的代码如下,在 Consumer 消费每一条消息后将 offset 提交回 Kafka。

from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata

tp = TopicPartition("test2", 0)
consumer = KafkaConsumer(bootstrap_servers=["localhost:9092"], group_id="testgroup", auto_offset_reset="earliest", enable_auto_commit=False)
consumer.assign([tp])
print "start offset is ", consumer.position(tp)
for message in consumer:
    print message
    consumer.commit(message.offset + 1)

启动 Consumer

$ python consumer.py
start offset is 98
ConsumerRecord(topic=u'test', partition=0, offset=98, timestamp=1512559632153, timestamp_type=0, key='824', value='824', checksum=828849435, serialized_key_size=3, serialized_value_size=3)
...
ConsumerRecord(topic=u'test', partition=0, offset=827, timestamp=1512559635164, timestamp_type=0, key='827', value='827', checksum=442222330, serialized_key_size=3, serialized_value_size=3)
ConsumerRecord(topic=u'test', partition=0, offset=828, timestamp=1512559636169, timestamp_type=0, key='828', value='828', checksum=-267344764, serialized_key_size=3, serialized_value_size=3)
ConsumerRecord(topic=u'test', partition=0, offset=829, timestamp=1512559637173, timestamp_type=0, key='829', value='829', checksum=1225853586, serialized_key_size=3, serialized_value_size=3)

可以看到 Consumer 从 offset=98 的消息开始消费,到 offset=829 时,我们 Ctrl+C 退出 Consumer。

我们再次启动 Consumer

$ python consumer.py
start offset is 830
ConsumerRecord(topic=u'test', partition=0, offset=830, timestamp=1512559638177, timestamp_type=0, key='830', value='830', checksum=1003305652, serialized_key_size=3, serialized_value_size=3)
ConsumerRecord(topic=u'test', partition=0, offset=831, timestamp=1512559639181, timestamp_type=0, key='831', value='831', checksum=-361607666, serialized_key_size=3, serialized_value_size=3)
ConsumerRecord(topic=u'test', partition=0, offset=832, timestamp=1512559640185, timestamp_type=0, key='832', value='832', checksum=-345891932, serialized_key_size=3, serialized_value_size=3)
...

可以看到重新启动后,Consumer 从上一次记录的 offset 开始继续消费消息。之后每一次 Consumer 重新启动,Consumer 都会从上一次停止的地方继续开始消费。

6.总结

本文主要介绍了一下 Kafka 的基本概念,并结合一些实验帮助理解 Kafka 中的一些难点,如多个 Consumer 的容错性机制,Offset 管理。文章来源地址https://www.toymoban.com/news/detail-743971.html

到了这里,关于【大数据】Kafka 入门简介的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 分布式 - 消息队列Kafka:Kafka 消费者消息消费与参数配置

    01. 创建消费者 在读取消息之前,需要先创建一个KafkaConsumer对象。创建KafkaConsumer对象与创建KafkaProducer对象非常相似——把想要传给消费者的属性放在Properties对象里。 为简单起见,这里只提供4个必要的属性:bootstrap.servers、key.deserializer 和 value.deserializer。 ① bootstrap.servers 指

    2024年02月12日
    浏览(42)
  • 分布式 - 消息队列Kafka:Kafka生产者发送消息的分区策略

    01. Kafka 分区的作用 分区的作用就是提供负载均衡的能力,或者说对数据进行分区的主要原因,就是为了实现系统的高伸缩性。不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的

    2024年02月13日
    浏览(50)
  • 分布式 - 消息队列Kafka:Kafka生产者发送消息的3种方式

    不管是把Kafka作为消息队列、消息总线还是数据存储平台,总是需要一个可以往Kafka写入数据的生产者、一个可以从Kafka读取数据的消费者,或者一个兼具两种角色的应用程序。 Kafka 生产者是指使用 Apache Kafka 消息系统的应用程序,它们负责将消息发送到 Kafka 集群中的一个或多

    2024年02月13日
    浏览(45)
  • 分布式 - 消息队列Kafka:Kafka 消费者的消费位移

    01. Kafka 分区位移 对于Kafka中的分区而言,它的每条消息都有唯一的offset,用来表示消息在分区中对应的位置。偏移量从0开始,每个新消息的偏移量比前一个消息的偏移量大1。 每条消息在分区中的位置信息由一个叫位移(Offset)的数据来表征。分区位移总是从 0 开始,假设一

    2024年02月12日
    浏览(46)
  • 分布式 - 消息队列Kafka:Kafka生产者架构和配置参数

    生产者发送消息流程参考图1: 先从创建一个ProducerRecord对象开始,其中需要包含目标主题和要发送的内容。另外,还可以指定键、分区、时间戳或标头。在发送ProducerRecord对象时,生产者需要先把键和值对象序列化成字节数组,这样才能在网络上传输。 接下来,如果没有显式

    2024年02月13日
    浏览(45)
  • 分布式 - 消息队列Kafka:Kafka消费者的分区分配策略

    Kafka 消费者负载均衡策略? Kafka 消费者分区分配策略? 1. 环境准备 创建主题 test 有5个分区,准备 3 个消费者并进行消费,观察消费分配情况。然后再停止其中一个消费者,再次观察消费分配情况。 ① 创建主题 test,该主题有5个分区,2个副本: ② 创建3个消费者CustomConsu

    2024年02月13日
    浏览(42)
  • 【新星计划】Kafka分布式发布订阅消息系统

      目录 Kafka分布式发布订阅消息系统 1. 概述 1.1 点对点消息传递模式 1.2 发布-订阅消息传递模式 1.3 Kafka特点 1.4 kafka拓扑图 2. Kafka工作原理 2.1 Kafka核心组件介绍 2.2 Kafka工作流程分析 2.2.1 生产者生产消息过程 2.2.2 消费者消费消息过程 2.2.3 Kafka Topics 2.2.4 Kafka Partition 2.2.4 Kafka

    2024年02月08日
    浏览(44)
  • 分布式消息队列Kafka(四)- 消费者

    1.Kafka消费方式 2.Kafka消费者工作流程 (1)总体工作流程 (2)消费者组工作流程 3.消费者API (1)单个消费者消费 实现代码 (2)单个消费者指定分区消费 代码实现: (3)消费者组消费 复制上面CustomConsumer三个,同时去订阅统一个主题,消费数据,发现一个分区只能被一个

    2023年04月26日
    浏览(44)
  • 分布式 - 消息队列Kafka:Kafka消费者分区再均衡(Rebalance)

    01. Kafka 消费者分区再均衡是什么? 消费者群组里的消费者共享主题分区的所有权。当一个新消费者加入群组时,它将开始读取一部分原本由其他消费者读取的消息。当一个消费者被关闭或发生崩溃时,它将离开群组,原本由它读取的分区将由群组里的其他消费者读取。 分区

    2024年02月12日
    浏览(37)
  • 分布式 - 消息队列Kafka:Kafka 消费者消费位移的提交方式

    最简单的提交方式是让消费者自动提交偏移量,自动提交 offset 的相关参数: enable.auto.commit:是否开启自动提交 offset 功能,默认为 true; auto.commit.interval.ms:自动提交 offset 的时间间隔,默认为5秒; 如果 enable.auto.commit 被设置为true,那么每过5秒,消费者就会自动提交 poll() 返

    2024年02月12日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包