KafKa集群搭建和知识点

这篇具有很好参考价值的文章主要介绍了KafKa集群搭建和知识点。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、KafKa概述

1.1 定义

KafKa是一个分布式的基于发布/订阅模式的消息队列,主要应用于大数据试试处理领域

是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、Storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。

名称 解释
Broker 消息中间件处理节点,一个Kafka节点就是一个broker,一个或者多个Broker可以组成一个Kafka集群
Topic Kafka根据topic对消息进行归类,发布到Kafka集群的每条消息都需要指定一个topic
Producer 消息生产者,向Broker发送消息的客户端
Consumer 消息消费者,从Broker读取消息的客户端
ConsumerGroup 每个Consumer属于一个特定的Consumer Group,一条消息可以被多个不同的Consumer Group消费,但是一个Consumer Group中只能有一个Consumer能够消费该消息
Partition 物理上的概念,一个topic可以分为多个partition,每个partition内部消息是有序的

2 kafka&zookeper安装/配置/启动

2.1 kafka下载和安装

2.1.1 安装包下载
本次安装的kafka版本是3.5.0, 自带zookeeper
https://archive.apache.org/dist/kafka/3.5.0/kafka_2.12-3.5.0.tgz

# 1 下载安装包到/opt目录下
# 2 进行解压 tar -xzvf kafka_2.12-3.5.0.tgz -C /opt
2.1.2 准备集群节点
192.168.190.147 node1
192.168.190.145 node2
192.168.190.142 node3

2.2 kafka&zookeeper配置

先以node1基点为例进行配置

2.2.1 server.properties配置

先以node1基点为例进行配置

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

#
# This configuration file is intended for use in ZK-based mode, where Apache ZooKeeper is required.
# See kafka.server.KafkaConfig for additional details and defaults
#

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0  # 每个kafka服务器是一个broker,如果kafka存在集群则注意填写该值

############################# Socket Server Settings #############################

# The address the socket server listens on. If not configured, the host name will be equal to the value of
# java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://node1:9092  # 这里用来配置本节点的kafka服务的监听端口

# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
#advertised.listeners=PLAINTEXT://your.host.name:9092

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/opt/kafka/kafka-logs  # kafka的log文件实际存储的是kafka的topic中的消息文件

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
#log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=node1:2181,node2:2181,node3:2181  # 这里用于配置zookeeper的地址和端口, 如果存在zk集群,则填写集群的多个ip:port

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000


############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0
2.2.2 zookeeper.properties配置
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
# 
#    http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# the directory where the snapshot is stored.
dataDir=/opt/kafka/kafka_2.12-3.5.0/zookeeper/data  # 定义zk的数据存储目录,需要手动 mkdir 
dataLogDir=/opt/kafka/kafka_2.12-3.5.0/zookeeper/log  # 定义zk的日志存储目录,需要手动 mkdir
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080

#设置连接参数,添加如下配置
#为zk的基本时间单元,毫秒
tickTime=2000
#Leader-Follower初始通信时限 tickTime*10
initLimit=10
#Leader-Follower同步通信时限 tickTime*5
syncLimit=5

# #设置broker Id的服务地址
server.1=node1:2888:3888
server.2=node2:2888:3888
server.3=node3:2888:3888
2.2.3 创建myid文件
在zookeeper.properties配置的数据目录下(/opt/kafka/kafka_2.12-3.5.0/zookeeper/data),创建一个文本文件myid, 内容为每个zookeeper节点的编号。因为是3个节点kafka集群,所以zookeeper集群也是3个节点,他们的编号分别是1、2、3.

经过以上三个步骤就配置好了node1节点,按照同样的方法配置node2和node3节点。

2.3 kafka&zookeeper的启动

# 注意要先启动zookeeper再启动kafka

# 启动zookeeper
./zookeeper-server-start.sh -daemon ../config/zookeeper.properties

# 启动kafka
./kafka-server-start.sh -daemon ../config/server.properties


# 停止zookeeper
./zookeeper-server-stop.sh

# 停止kafka
./kafka-server-stop.sh

3 kafka相关操作命令

3.1 创建kafka的topic

./kafka-topics.sh --create --bootstrap-server node1:9092 --replication-factor 1 --partitions 1 --topic test
# 1 说明:
# --bootstrap-server 192.168.190.136:9092 表示kafka地址和端口
#--zookeeper:定义 zookeeper 集群服务器地址,如果有多个 IP 地址使用逗号分割,一般使用一个 IP 即可/ 2.20废弃!!!!!!!!!!!!!
#--replication-factor:定义分区副本数,1 代表单副本,建议为 2 
#--partitions:定义分区数 
#--topic:定义 topic 名称

3.2 查看topic列表

./kafka-topics.sh --list --bootstrap-server node1:9092
# 2 说明:
	--bootstrap-server node1:9092 表示kafka地址和端口

3.3 查看topic信息

./kafka-topics.sh --describe --bootstrap-server node1:9092 --topic test

3.4 创建kafka生产者

kafka自带了一个producer的命令客户端,可以从本地文件中读取内容,或者我们也可以一命令行中直接输入内容,并将这些内容以消息的形式送到kafka集群中。在默认情况下,没一行会被当成一个独立的消息。使用kafka的发送消息的客户端,指定发送到kafka服务器地址和topic

# 创建一个kafka控制台的生产者,创建后可在控制台直接输入信息发送到对应的topic下
./kafka-console-producer.sh --broker-list node1:9092 --topic test

3.5 创建kafka消费者

# 创建一个kafka控制台的消费者,创建后可直接取对应的topic的数据并输出
./kafka-console-consumer.sh --bootstrap-server node1:9092 --topic test --from-beginning

#--from-beginning:会把主题中以往所有的数据都读取出来

3.6 查看消费者组列表

./kafka-consumer-groups.sh --bootstrap-server node1:9092 --list

3.7 查看kafka中某一个消费者组的消费情况

./kafka-consumer-groups.sh --bootstrap-server node1 --group console-consumer-6920 --describe

# console-consumer-6920: 表示消费者组
# 返回如下
GROUP TOPIC PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG  CONSUMER-ID HOST  CLIENT-ID
console-consumer-6920 test      0          -               12              -               console-consumer-e162ef6d-600f-45d1-8198-af7e230642a7 /192.168.190.136 console-consumer

4 zookeeper相关命令

4.1 连接zookeeper

# ./zookeeper-shell.sh zookeeper_server:port --> 连接zookeeper
./zookeeper-shell.sh node1:2181

# 不进入zookeeper执行相关指令
./zookeeper-shell.sh node1:2181 ls /                  # 查看当前 ZooKeeper 中所包含的内容
./zookeeper-shell.sh node1:2181 ls -s /			   # 查看当前节点数据
./zookeeper-shell.sh node1:2181 ls /brokers/ids       # 
./zookeeper-shell.sh node1:2181 ls /brokers/topics
./zookeeper-shell.sh node1:2181 get /brokers/ids/0

4.2 查看当前Zookeeper中所含的内容

# 查看当前Zookeeper中所含的内容
ls /

4.3 查看当前节点数据

ls -s /
# 返回如下
[admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]
cZxid = 0x0                               # 数据节点创建时的事务 ID
ctime = Wed Dec 31 16:00:00 PST 1969 	  # 数据节点创建时的时间
mZxid = 0x0 							# 数据节点最后一次更新时的事务 ID
mtime = Wed Dec 31 16:00:00 PST 1969 	  # 数据节点最后一次更新时的时间
pZxid = 0x200000057                       # 数据节点的子节点最后一次被修改时的事务 ID
cversion = 18						    # 子节点的更改次数
dataVersion = 0						    # 节点数据的更改次数
aclVersion = 0                            # 节点的 ACL 的更改次数
ephemeralOwner = 0x0                      # 如果节点是临时节点,则表示创建该节点的会话的 SessionID;如果节点是持久节点,则该属性值为 0
dataLength = 0                            # 数据内容的长度
numChildren = 12                          # 数据节点当前的子节点个数

4.4 创建节点

#创建序列化永久节点:
create -s /testnode test
#创建临时节点
create -e /testnode-temp testtemp
#创建永久节点:
create /testnode-p testp

4.5 获取节点

ls path [watch]
get path [watch]
ls -s path [watch]

4.6 修改节点

ls -s /
get -s /testnode-temp
set /testnode-temp 123
get -s /testnode-temp

4.7 监听节点

get /testnode-temp watch
set  /testnode-temp testwatch
#他会回调Watch得到触发结果

4.8 删除节点

#普通删除的命令
delete path [version]
#递归删除的命令
rmr path [version]

deleteall path

二、 知识点

1 主题Topic

​ 在逻辑的对消息的种类进行划分

2 分区partition

当Topic中的消息非常多的时候,会导致kafka的log日志文件特别大(kafka的日志文件存储的是消息)。

2.1 分区的作用

  • 可以分布式存储
  • 可以并行写

2.2 实例说明1

  • 为一个主题创建多个分区
# 创建一个名为test的Topic, 并为其创建2个partition
./kafka-topics.sh --create --bootstrap-server node1:9092 --replication-factor 1 --partitions 2 --topic mytest
  • 查看topic的分区信息
./kafka-topics.sh --describe --bootstrap-server node1:9092 --topic mytest


# 返回
root@ubuntu:/kafka/bin# ./kafka-topics.sh --describe --bootstrap-server node1:9092 --topic mytest
Topic: mytest   TopicId: ulEszEwcT0i3Iu5TyfIglg PartitionCount: 2       ReplicationFactor: 1    Configs: 
        Topic: mytest   Partition: 0    Leader: 2       Replicas: 2     Isr: 2
        Topic: mytest   Partition: 1    Leader: 1       Replicas: 1     Isr: 1

  • 查看kafka日志文件
当设置mytest的partition==2的时候,会在node1上创建一个mytest-1, 在node2上创建一个mytest-0. 因为kafka的是集群搭建,所以在给mytest创建分区的时候会随机分配到集群中的两个节点上。
可以看到 kafka默认的toipic:__consumer_offsets,具有50个partition,分散在node1和node2的节点上。。

# node1节点
drwxr-xr-x 29 root root 4096 Aug 10 02:21 ./
drwxr-xr-x  4 root root 4096 Aug  7 21:18 ../
-rw-r--r--  1 root root    0 Aug  7 21:18 cleaner-offset-checkpoint
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-1/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-11/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-13/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-15/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-17/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-19/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-21/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-23/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-25/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-27/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-29/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-3/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-31/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-33/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-35/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-37/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-39/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-41/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-43/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-45/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-47/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-49/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-5/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-7/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-9/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 faust-group-01-__assignor-__leader-0/
-rw-r--r--  1 root root    0 Aug  7 21:18 .lock
-rw-r--r--  1 root root    4 Aug 10 02:21 log-start-offset-checkpoint
-rw-r--r--  1 root root   88 Aug 10 02:16 meta.properties
drwxr-xr-x  2 root root 4096 Aug 10 02:18 mytest-1/
-rw-r--r--  1 root root  657 Aug 10 02:21 recovery-point-offset-checkpoint
-rw-r--r--  1 root root  657 Aug 10 02:21 replication-offset-checkpoint


# node2节点
root@ubuntu:/opt/kafka/kafka_2.12-3.5.0/bin# ll /opt/kafka/kafka-logs/
total 144
drwxr-xr-x 32 root root 4096 Aug 10 02:21 ./
drwxr-xr-x  4 root root 4096 Aug  7 21:18 ../
-rw-r--r--  1 root root    0 Aug  7 21:18 cleaner-offset-checkpoint
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-0/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-10/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-12/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-14/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-16/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-18/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-2/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-20/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-22/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-24/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-26/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-28/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-30/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-32/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-34/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-36/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-38/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-4/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-40/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-42/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-44/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-46/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-48/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-6/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-8/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 example-0/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 faust-group-02-__assignor-__leader-0/
-rw-r--r--  1 root root    0 Aug  7 21:18 .lock
-rw-r--r--  1 root root    4 Aug 10 02:21 log-start-offset-checkpoint
-rw-r--r--  1 root root   88 Aug 10 02:18 meta.properties
drwxr-xr-x  2 root root 4096 Aug 10 02:18 myid-__assignor-__leader-0/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 mytest-0/
-rw-r--r--  1 root root  703 Aug 10 02:21 recovery-point-offset-checkpoint
-rw-r--r--  1 root root  703 Aug 10 02:21 replication-offset-checkpoint
drwxr-xr-x  2 root root 4096 Aug 10 02:21 wzp-0/

2.2 实例说明2

​ kafka每个消费者会维护自己消费的主题的偏移量offset, 并把这个offset提交给kafka内部的topic: __consumer_offsets, 提交过去的时候,key是consumerGroupId+topic+分区号,value就是当前offset的值,kafka会定期清理topic中的消息,最后就保留最新的那条数据。

​ 因为_consumer_offsets可能会接收高并发的请求,kafka默认给其分配50个分区(可以通过offsets.topic.num.partitions设置),这样可以通过加机器的方式增大并发。

​ 通过如下公式可以选出consumer消费的offset要提交到_consumer_offsets的哪个分区。

​ 公式: hash(consumerGroupId) % __consumer_offsets主题的分区数

3 副本replication

副本是对分区的备份。在集群中,不同的副本会被部署到不同的broker上。同一个分区可以设定多个相同的副本,其中一个叫做 leader,其他叫做 follower,生产者和消费者只和 leader 交互,生产者生产的消息发送到 leader 之后,其他 follower 才能同从 leader 中同步消息。当 leader 发生故障的时候会从 follower 中选择一个称为新的leader。

Kafka 的多分区(Partition)以及多副本(Replica)机制有什么好处呢?

Kafka 通过给特定 Topic 指定多个 Partition, 而各个 Partition 可以分布在不同的 Broker 上, 这样便能提供比较好的并发能力(负载均衡)。

Partition 可以指定对应的 Replica 数, 这也极大地提高了消息存储的安全性, 提高了容灾能力,不过也相应的增加了所需要的存储空间。

# 创建 1个topic 2个分区 3个副本
./kafka-topics.sh --create --bootstrap-server node1:9092 --replication-factor 3 --partitions 2 --topic mytest01


# 返回
root@ubuntu:/kafka/bin# ./kafka-topics.sh --describe --bootstrap-server node1:9092 --topic mytest01
Topic: mytest01 TopicId: MSmYL0CXStu0g_29irzH3A PartitionCount: 2       ReplicationFactor: 2    Configs: 
        Topic: mytest01 Partition: 0    Leader: 2       Replicas: 2,1   Isr: 2,1
        Topic: mytest01 Partition: 1    Leader: 1       Replicas: 1,2   Isr: 1,2

# 说明
mytest01的0分区的leader在node2节点上
mytest01的1分区的leader在node1节点上


# leader:副本里的概念
每个partition都有一个broker作为leader
生产者和消费者的读写请求都是发生在leader所在的分区,而不是副本上

# fllower: leader处理所有针对这个partition分区的读写请求。follower被动复制leader,不提供读写

10 如何保证消息的消费顺序

Kafka 通过以下机制来保证消费的顺序性:

  1. 分区: Kafka 中的每个主题可以分为多个分区,每个分区都只由一个消费者进行消费。分区是 Kafka 中实现水平扩展和并行处理的基本单位。对于同一分区内的消息,它们的顺序是有序的,所以确保每个分区只被一个消费者处理可以保证该分区内消息的顺序性。
  2. 偏移量(Offset): 每个分区中的消息都有一个唯一标识符,称为偏移量(Offset)。偏移量表示消息在分区内的顺序。消费者可以通过指定偏移量来读取分区中的特定消息。Kafka 使用偏移量来记录消费者的消费进度,并在消费者恢复或重启时从上一次的偏移量处继续消费,保证了消息的有序性。
  3. 消费者组: 多个消费者可以组成一个消费者组,并同时消费同一个主题的不同分区。当启动多个消费者时,Kafka 会自动分配分区给不同的消费者,保证每个分区只由一个消费者处理。这样,每个消费者负责消费自己所分配的分区,而不会出现多个消费者同时消费同一个分区的情况,从而保证了消费的顺序性。

需要注意的是,Kafka 只能在同一个分区内保证消息的顺序性,而不同分区之间的顺序是无法保证的。如果需要对多个分区的消息进行有序处理,可以通过将多个分区的数据合并到一个分区中,或使用其他流处理工具(如 Apache Flink 或 Apache Spark Streaming)来实现。

总结起来,Kafka 通过分区、偏移量和消费者组等机制来保证消费的顺序性。每个分区内的消息是有序的,而消费者组确保每个分区只由一个消费者处理,从而保证了消费整体上的顺序性。

11 zookeeper在kafka中的作用

用于进行broker和topic的注册文章来源地址https://www.toymoban.com/news/detail-641300.html

到了这里,关于KafKa集群搭建和知识点的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 消息中间件(MQ)对比:RabbitMQ、Kafka、ActiveMQ 和 RocketMQ

    前言 在构建分布式系统时,选择适合的消息中间件是至关重要的决策。RabbitMQ、Kafka、ActiveMQ 和 RocketMQ 是当前流行的消息中间件之一,它们各自具有独特的特点和适用场景。本文将对这四种消息中间件进行综合比较,帮助您在项目中作出明智的选择。 1. RabbitMQ 特点: 消息模

    2024年02月20日
    浏览(54)
  • 后端常使用的中间件知识点--持续更新

    类型 难度 mysql mysql中SQL优化:多角度分析 包学包会,sql优化全过程,刨根分析 redis 多角度剖析redis数据结构及底层实现原理、应用场景 MQ 简单大体说明RabbitMQ的使用(简单版) mybatis 使用JDBC的批量插入百万数据要多少秒 一遍就会的,从0开始在springboot上使用Mybatis对数据库进

    2024年02月13日
    浏览(38)
  • 分布式消息队列RabbitMQ-Linux下服务搭建,面试完腾讯我才发现这些知识点竟然没掌握全

    vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app 5.修改配置文件 这里面修改{loopback_users, [“guest”]}改为{loopback_users, []} {application, rabbit, %% - - erlang - - [{description, “RabbitMQ”}, {id, “RabbitMQ”}, {vsn, “3.6.5”}, {modules, [‘background_gc’,‘delegate’,‘delegate_sup’,‘dtree’,‘file_han

    2024年04月14日
    浏览(56)
  • kafka知识点全方位讲解

    Apache Kafka是一个开源消息系统,由Scala写成。是由Apache软件基金会开发的一个开源消息系统项目。 Kafka最初是由LinkedIn开发,并于2011年初开源。2012年10月从Apache Incubator毕业。该项目的目标是为处理实时数据提供一个统一、高通量、低等待的平台。 Kafka是一个分布式消息队列:

    2023年04月25日
    浏览(44)
  • Kafka 知识点学习【Kafka 学习之24问-第二十四刊】

    🏆作者简介,普修罗双战士,一直追求不断学习和成长,在技术的道路上持续探索和实践。 🏆多年互联网行业从业经验,历任核心研发工程师,项目技术负责人。 🎉欢迎 👍点赞✍评论⭐收藏 Kafka知识专栏学习 Kafka知识云集 访问地址 备注 Kafka知识点(1) https://blog.csdn.net/m

    2024年02月05日
    浏览(43)
  • 【消息中间件】详解mq消息积压

    作者简介 目录 1.产生原因 2.解决办法 2.1.事前处理机制 2.2.事中处理机制 2.3.事后处理机制 消息积压(Message Backlog)指的是在消息队列(MQ)系统中等待被处理的消息数量超过了正常的处理速度,导致消息在队列中积压堆积的情况。 消息积压的常见表现: 系统资源使用率上升

    2024年02月07日
    浏览(47)
  • 快速掌握MQ消息中间件rabbitmq

    Survive by day and develop by night. talk for import biz , show your perfect code,full busy,skip hardness,make a better result,wait for change,challenge Survive. happy for hardess to solve denpendies. 需求: 1.video A https://www.bilibili.com/video/BV1cb4y1o7zz?p=12vd_source=533ee415c42b820b0f4105acb4932a02 参考资料 官方文档 开源社区 博客文

    2024年02月11日
    浏览(47)
  • MQ(消息中间件)概述及 RabbitMQ 的基本介绍

    消息队列中间件是分布式系统中重要的组件,主要解决 应用解耦,异步消息,流量削锋等 问题,实现高性能,高可用,可伸缩和最终一致性架构。流量削锋 : 削减峰值压力(秒杀,抢购) MQ(Message Queue,消息队列)是典型的生产者、消费者模型。生产者不断向消息队列中

    2024年02月12日
    浏览(47)
  • 分布式消息流处理平台kafka(一)-kafka单机、集群环境搭建流程及使用入门

    kafka最初是LinkedIn的一个内部基础设施系统。最初开发的起因是,LinkedIn虽然有了数据库和其他系统可以用来存储数据,但是缺乏一个可以帮助处理持续数据流的组件。 所以在设计理念上,开发者不想只是开发一个能够存储数据的系统,如关系数据库、Nosql数据库、搜索引擎等

    2024年02月16日
    浏览(52)
  • 消息中间件之八股面试回答篇:一、问题概览+MQ的应用场景+RabbitMQ如何保证消息不丢失(生产者确认机制、持久化、消费者确认机制)+回答模板

    目前主流的消息队列技术(MQ技术)分为RabbitMQ和Kafka,其中深蓝色为只要是MQ,一般都会问到的问题。浅蓝色是针对RabbitMQ的特性的问题。蓝紫色为针对Kafka的特性的问题。 MQ主要提供的功能为:异步 解耦 削峰 。 展开来讲就是 异步发送(验证码、短信、邮件…) MYSQL和Redi

    2024年01月24日
    浏览(61)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包