kafka的安装和基本操作

这篇具有很好参考价值的文章主要介绍了kafka的安装和基本操作。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

基本概念

简介

Kafka 最初是由 LinkedIn 即领英公司基于 Scala 和 Java 语言开发的分布式消息发布-订阅系统,现已捐献给Apache 软件基金会。其具有高吞吐、低延迟的特性,许多大数据实时流式处理系统比如 Storm、Spark、Flink等都能很好地与之集成。

总的来讲,Kafka 通常具有 3 重角色:

  • 存储系统:通常消息队列会把消息持久化到磁盘,防止消息丢失,保证消息可靠性。Kafka 的消息持久化机制和多副本机制使其能够作为通用数据存储系统来使用。
  • 消息系统:Kafka 和传统的消息队列比如 RabbitMQ、RocketMQ、ActiveMQ 类似,支持流量削峰、服务解耦、异步通信等核心功能。 ==》 先进先出 ==》 只针对分区,不是全局的
  • 流处理平台:Kafka 不仅能够与大多数流式计算框架完美整合,并且自身也提供了一个完整的流式处理库,即 Kafka Streaming。Kafka Streaming 提供了类似 Flink 中的窗口、聚合、变换、连接等功能。

一句话概括:Kafka 是一个分布式的基于发布/订阅模式的消息中间件,在业界主要应用于大数据实时流式计算领域,起解耦合和削峰填谷的作用。

特点

  • 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分多个partition, 由多个consumer group 对partition进行consume操作。
  • 可扩展性:kafka集群支持热扩展
  • 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
  • 容错性:允许集群中有节点失败(若副本数量为n,则允许n-1个节点失败)
  • 高并发:支持数千个客户端同时读写

Kafka在各种应用场景中,起到的作用可以归纳为这么几个术语:削峰填谷,解耦!
在大数据流式计算领域中,kafka主要作为计算系统的前置缓存和输出结果缓存;

安装部署

kafka基于Zookeeper, 因此需要先安装Zookeeper, 详见https://www.cnblogs.com/paopaoT/p/17461562.html

  1. 上传安装包
  2. 解压
tar -zxvf kafka_2.11-2.2.2.tgz tar  -C /opt/apps/
  1. 修改配置文件
# 进入配置文件目录
cd kafka_2.12-2.3.1/config
# 编辑配置文件
vi server.properties
# 为依次增长的:0、1、2、3、4,集群中唯一 id
broker.id=0
# 数据存储的⽬录 
log.dirs=/opt/data/kafka
# 底层存储的数据(日志)留存时长(默认7天)
log.retention.hours=168
# 底层存储的数据(日志)留存量(默认1G)
log.retention.bytes=1073741824
# 指定zk集群地址
zookeeper.connect=linux01:2181,linux02:2181,linux03:2181
  1. 环境变量
vi /etc/profile
export KAFKA_HOME=/opt/apps/kafka_2.11-2.2.2
export PATH=$PATH:$KAFKA_HOME/bin

source /etc/profile
  1. 分发安装包
for  i  in {2..3}
do 
scp  -r  kafka_2.11-2.2.2  linux0$i:$PWD 
done

# 安装包分发后,记得修改config/server.properties中的 配置参数: broker.id
# 注意:还需要分发环境变量
  1. 启停集群(在各个节点上启动)
bin/kafka-server-start.sh -daemon /opt/apps/kafka_2.11-2.2.2/config/server.properties

# 停止集群
bin/kafka-server-stop.sh
  1. 一键启停脚本:
#!/bin/bash

case $1 in
"start"){
        for i in linux01 linux02 linux03
        do
        echo ---------- kafka $i 启动 ------------
                ssh $i "source /etc/profile;  /opt/app/kafka2.4.1/bin/kafka-server-start.sh -daemon /opt/app/kafka2.4.1/config/server.properties"
        done
};;
"stop"){
        for i in linux01 linux02 linux03
        do
        echo ---------- kafka $i 停止 ------------
                ssh $i "source /etc/profile;  /opt/app/kafka2.4.1/bin/kafka-server-stop.sh "
        done
};;
esac

基本操作

概述

Kafka 中提供了许多命令行工具(位于$KAFKA_HOME/bin 目录下)用于管理集群的变更。

脚本 作用
kafka-console-producer.sh 生产消息
kafka-topics.sh 管理主题
kafka-server-stop.sh 关闭Kafka服务
kafka-server-start.sh 启动Kafka服务
kafka-configs.sh 配置管理
kafka-consumer-perf-test.sh 测试消费性能
kafka-producer-perf-test.sh 测试生产性能
kafka-dump-log.sh 查看数据日志内容
kafka-preferred-replica-election.sh 优先副本的选举
kafka-reassign-partitions.sh 分区重分配

管理操作:kafka-topics

创建topic

--bootstrap-server 和 --zookeeper一样的效果 ,新版本建议使用 --bootstrap-server

kafka-topics.sh   --bootstrap-server  linux01:9092,linux02:9092,linux03:9092    --create --topic test01  --partitions 3  --replication-factor  3

参数解释:
--replication-factor  副本数量
--partitions 分区数量 
--topic topic名称
# 本方式,副本的存储位置是系统自动决定的


# 手动指定分配方案:分区数,副本数,存储位置
kafka-topics.sh --create --topic tpc-1  --zookeeper linux01:2181 --replica-assignment 0:1:3,1:2:6

该topic,将有如下partition:(2个分区 3个副本)
partition0 ,所在节点: broker0、broker1、broker3
partition1 ,所在节点: broker1、broker2、broker6

# 查看topic的状态信息
kafka-topics.sh --describe --topic tpc-1 --zookeeper linux01:2181
Topic: tpc-1    PartitionCount: 2       ReplicationFactor: 3    Configs: 
        Topic: tpc-1    Partition: 0    Leader: 0       Replicas: 0,1,3 Isr: 0,1
        Topic: tpc-1    Partition: 1    Leader: 1       Replicas: 1,2,6 Isr: 1,2

查看topic列表

kafka-topics.sh --bootstrap-server linux01:9092,linux02:9092,linux03:9092 --list

kafka-topics.sh --list --zookeeper linux01:2181
__consumer_offsets
tpc-1

查看topic状态信息

kafka-topics.sh --describe --zookeeper linux01:2181  --topic test
Topic: test     PartitionCount: 3       ReplicationFactor: 3    Configs: 
        Topic: test     Partition: 0    Leader: 2       Replicas: 2,0,1 Isr: 2,0,1
        Topic: test     Partition: 1    Leader: 0       Replicas: 0,1,2 Isr: 0,1,2
        Topic: test     Partition: 2    Leader: 1       Replicas: 1,2,0 Isr: 1,2,0


# topic的分区数量,以及每个分区的副本数量,以及每个副本所在的broker节点,以及每个分区的leader副本所在broker节点,以及每个分区的ISR副本列表;
# ISR: in  sync  replica ,同步副同步本(当然也包含leader自身,replica.lag.time.max.ms =30000)
# OSR:out  of  sync replicas 失去同步的副本(该副本上次请求leader同步数据距现在的时间间隔超出配置阈值)

# ISR同步副本列表
# ISR概念:(同步副本)。每个分区的leader会维护一个ISR列表,ISR列表里面就是follower副本的Borker编号,只有跟得上Leader的 follower副本才能加入到 ISR里面
# 这个是通过replica.lag.time.max.ms =30000(默认值)参数配置的,只有ISR里的成员才有被选为 leader 的可能。

踢出ISR和重新加入ISR的条件:

  • 踢出ISR的条件: 由replica.lag.time.max.ms =30000决定,如上图;
  • 重新加入ISR的条件: OSR副本的LEO(log end offset)追上leader的LEO;

删除topic

bin/kafka-topics.sh --zookeeper linux01:2181 --delete --topic test
# 删除topic,server.properties中需要一个参数处于启用状态: delete.topic.enable = true(默认是true)

# 使用 kafka-topics .sh 脚本删除主题的行为本质上只是在 ZooKeeper 中的 /admin/delete_topics 路径下建一个与待删除主题同名的节点,以标记该主题为待删除的状态。然后由 Kafka控制器异步完成。

增加分区数

kafka-topics.sh --zookeeper linux01:2181 --alter --topic paopao --partitions 3

# Kafka只支持增加分区,不支持减少分区
# 原因是:减少分区,代价太大(数据的转移,日志段拼接合并)
# 如果真的需要实现此功能,则完全可以重新创建一个分区数较小的主题,然后将现有主题中的消息按照既定的逻辑复制过去;

动态配置topic参数(不常用)

# 通过管理命令,可以为已创建的topic增加、修改、删除topic level参数
# 添加/修改  指定topic的配置参数:

kafka-topics.sh  --zookeeper linux01:2181  --alter  --topic tpc2 --config compression.type=gzip
# --config compression.type=gzip  修改或添加参数配置
# --add-config compression.type=gzip  添加参数配置
# --delete-config compression.type  删除配置参数

生产者:kafka-console-producer

kafka-console-producer.sh --broker-list linux01:9092 --topic test01
>a
>b
>c
>hello 
>hi
>hadoop
>hive

顺序轮询(老版本)

顺序分配,消息是均匀的分配给每个 partition,即每个分区存储一次消息,轮询策略是 Kafka Producer 提供的默认策略,如果你不使用指定的轮询策略的话,Kafka 默认会使用顺序轮训策略的方式。

随机分配

实现随机分配的代码只需要两行,如下

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size()); 

消费者:kafka-console-consumer

消费者在消费的时候,需要指定要订阅的主题,还可以指定消费的起始偏移量

起始偏移量的指定策略有3中:

  • earliest 起始点
  • latest 最新
  • 指定的offset( 分区号:偏移量) ==》 必须的告诉他是哪个topic 的哪个分区的哪个offset
  • 从之前所记录的偏移量开始消费

在命令行中,可以指定从什么地方开始消费

  1. 加上参数 --from-beginning 指定从最前面开始消费
  2. 如果不加--from-beginning 就需要分情况讨论了,如果之前记录过消费的位置,那么就从之前消费的位置开始消费,如果说之前没有记录过之前消费的偏移量,那么就从最新的位置开始消费

kafka的topic中的消息,是有序号的(序号叫消息偏移量),而且消息的偏移量是在各个partition中独立维护的,在各个分区内,都是从0开始递增编号!

# 消费消息
kafka-console-consumer.sh --bootstrap-server linux01:9092 --topic test01 --from-beginning
hive
hello
hadoop

# 指定从最前面开始消费
kafka-console-consumer.sh --bootstrap-server linux01:9092 --topic paopao --from-beginning
hadoop
list
hello
kafka

# 不指定他消费的位置的时候,就是从最新的地方开始消费
kafka-console-consumer.sh --bootstrap-server linux01:9092 --topic paopao

# 指定要消费的分区,和要消费的起始offset
# 从指定的offset(需要指定偏移量和分区号)
kafka-console-consumer.sh --bootstrap-server linux01:9092 --topic paopao --offset 2 --partition 0
yy
abc
3333
2222

消费组

  • 消费组是kafka为了提高消费并行度的一种机制!
  • 在kafka的底层逻辑中,任何一个消费者都有自己所属的组(如果没有指定,系统会自己给你分配一个组id)
  • 组和组之间,没有任何关系,大家都可以消费到目标topic的所有数据
  • 但是组内的各个消费者,就只能读到自己所分配到的partitions
  • KAFKA中的消费组,可以动态增减消费者,而且消费组中的消费者数量发生任意变动,都会重新分配分区消费任务(消费者组在均衡策略)

如何让多个消费者组成一个组: 就是让这些消费者的groupId相同即可!

消费位移的记录

kafka的消费者,可以记录自己所消费到的消息偏移量,记录的这个偏移量就叫(消费位移);

记录这个消费到的位置,作用就在于消费者重启后可以接续上一次消费到位置来继续往后面消费;

消费位移,是组内共享的!!!消费位置记录在一个内置的topic中 ,默认是5s提交一次位移更新。
参数:auto.commit.interval.ms 默认是5s记录一次

#  可以使用特定的工具类 解析内置记录偏移量的topic
kafka-console-consumer.sh --bootstrap-server linux01:9092 --from-beginning --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
# 通过指定formatter工具类,来对__consumer_offsets主题中的数据进行解析;

[g01,linux01,0]::OffsetAndMetadata(offset=14, leaderEpoch=Optional[0], metadata=, commitTimestamp=1659889851318, expireTimestamp=None)
[g01,linux01,2]::OffsetAndMetadata(offset=17, leaderEpoch=Optional[0], metadata=, commitTimestamp=1659889856319, expireTimestamp=None)
[g01,linux01,1]::OffsetAndMetadata(offset=13, leaderEpoch=Optional[0], metadata=, commitTimestamp=1659889856319, expireTimestamp=None)
[g01,linux01,0]::OffsetAndMetadata(offset=14, leaderEpoch=Optional[0], metadata=, commitTimestamp=1659889856319, expireTimestamp=None)

# 如果需要获取某个特定 consumer-group的消费偏移量信息,则需要计算该消费组的偏移量记录所在分区: Math.abs(groupID.hashCode()) % numPartitions(50)
# 根据组id的hash取值%50 确定具体是将这个组具体每个分区消费到了哪里
# __consumer_offsets的分区数为:50

配置管理 kafka-config

kafka-configs.sh 脚本是专门用来进行动态参数配置操作的,这里的操作是运行状态修改原有的配置,如此可以达到动态变更的目的;一般情况下不会进行动态修改 。
动态配置的参数,会被存储在zookeeper上,因而是持久生效的
可用参数的查阅地址: https://kafka.apache.org/documentation/#configuration

# kafka-configs.sh 脚本包含:变更alter、查看describe 这两种指令类型;
# kafka-configs. sh 支持主题、 broker 、用户和客户端这4个类型的配置。
# kafka-configs.sh 脚本使用 entity-type 参数来指定操作配置的类型,并且使 entity-name参数来指定操作配置的名称。
# 比如查看topic的配置可以按如下方式执行:
kafka-configs.sh --zookeeper linux01:2181  --describe  --entity-type topics  --entity-name paopao

# 查看broker的动态配置可以按如下方式执行:
kafka-configs.sh  --describe --entity-type brokers --entity-name 0 --zookeeper linux01:2181

entity-type和entity-name的对应关系

kafka的安装和基本操作

# 示例:添加topic级别参数
kafka-configs.sh --zookeeper linux01:2181 --alter --entity-type topics --entity-name paopao --add-config cleanup.policy=compact,max.message.bytes=10000

# 示例:添加broker参数
kafka-configs.sh  --entity-type brokers --entity-name 0 --alter --add-config log.flush.interval.ms=1000 --bootstrap-server linux01:9092,linux02:9092,linux03:9092

动态配置topic参数

通过管理命令,可以为已创建的topic增加、修改、删除topic level参数
添加/修改 指定topic的配置参数:文章来源地址https://www.toymoban.com/news/detail-474589.html

kafka-topics.sh  --topic paopao --alter  --config compression.type=gzip --zookeeper linux01:2181

# 如果利用 kafka-configs.sh 脚本来对topic、producer、consumer、broker等进行参数动态
# 添加、修改配置参数
kafka-configs.sh --zookeeper linux01:2181 --entity-type topics --entity-name paopao --alter --add-config compression.type=gzip

# 删除配置参数
kafka-configs.sh --zookeeper linux01:2181 --entity-type topics --entity-name paopao --alter --delete-config compression.type

到了这里,关于kafka的安装和基本操作的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • kafka--kafka的基本概念-副本概念replica

    Broker 表示实际的物理机器节点 Broker1中的绿色P1表示主分片Broker2中的蓝色P1表示副本分片,其余类似,就是主从的概念,如果一个Broker挂掉了,还有其它的节点来保证数据的完整性 P可以看做分区 同一时间点,绿色P1 和紫色P1 不会完全一致,存在一个同步的过程 绿色部分处理

    2024年02月12日
    浏览(44)
  • DOS简介及基本操作

            简介 DOS,是磁盘操作系统(英文:Disk Operating System)的缩写,是个人计算机上的一类操作系统。Microsoft Windows版本是以DOS为基础,如Windows 95、98和Me等为例其商业寿命可以算到2000年。 DOS是个非常实用的操作系统,因此,它深深受到国内外人们的普遍喜爱,一直拥有

    2024年02月05日
    浏览(35)
  • 操作系统安全 基本概念

    参考教材是沈晴霓的《操作系统安全设计》,课程链接:https://www.coursera.org/learn/os-virtsecurity 本书内容由浅入深,分为“基础篇”、“理论篇”、“实践篇”和“趋势篇”四大部分。 “基础篇\\\"重点介绍操作系统基本安全概念、通用安全需求、安全标准和必要的安全机制等。

    2024年02月09日
    浏览(42)
  • 王道操作系统学习笔记(1)——操作系统基本概念

    本文介绍了操作系统的基本概念,文章中的内容来自B站王道考研操作系统课程,想要完整学习的可以到B站官方看完整版。 操作系统:系统资源的管理者(处理机管理、存储器管理、文件管理、设备管理) 交互式命令(在终端中输命令)和批处理命令(Shell脚本) 并发: 宏

    2024年02月10日
    浏览(52)
  • Kafka - 深入了解Kafka基础架构:Kafka的基本概念

    我们首先了解一些Kafka的基本概念。 1)Producer :消息生产者,就是向kafka broker发消息的客户端 2)Consumer :消息消费者,向kafka broker获取消息的客户端 3)Consumer Group (CG):消费者组,由多个consumer组成。消费者组内每个消费者负责消费不同分区的数据,一个broker可以有多个

    2024年02月08日
    浏览(37)
  • 栈的概念及其基本操作--详细(C++)

    基本概念及相关术语: 栈是只允许 在一端 进行插入和删除操作的 线性表 。 由此可见,栈也是线性表的一种,只是栈的操作受限制的线性表。 栈顶(top):线性表允许插入和删除的那一段。 值得注意的是,栈顶指针top的指向是有些两种方式的,一种是指向栈顶当前元素,

    2024年02月08日
    浏览(38)
  • 大数据 - Kafka系列《一》- Kafka基本概念

    目录 🐶1.1 什么是kafka 🐶1.2 Kafka可以用来做什么 🐶1.3 kafka的特点 🥙1. 高吞吐量、低延迟 🥙2. 可扩展性 🥙3. 持久性、可靠性 🥙4. 容错性 🥙5. 高并发 🐶1.4 Kafka的基本架构 1. 🥙Producer:生产者 2. 🥙Broker:中间组件,存储数据 Topic:主题。类似于表的概念 partition:分区。

    2024年01月20日
    浏览(39)
  • kafka--技术文档-基本概念-《快速了解kafka》

    学习一种新的消息中间键,卡夫卡!!! 官网网址 Apache Kafka         Kafka是一种开源的分布式流处理平台,由Apache软件基金会开发,用Scala和Java编写。它是一个高吞吐量的分布式发布订阅消息系统,可以处理消费者在网站中的所有动作流数据。这种动作可以是网页浏览、

    2024年02月11日
    浏览(51)
  • kafka--kafka的基本概念-topic和partition

    topic是逻辑概念 以Topic机制来对消息进行分类的,同一类消息属于同一个Topic,你可以将每个topic看成是一个消息队列。 生产者(producer)将消息发送到相应的Topic,而消费者(consumer)通过从Topic拉取消息来消费 kafka中是要求消费者主动拉取消息消费的,它并不会主动推送消息

    2024年02月12日
    浏览(45)
  • 【数据结构】_7.二叉树概念与基本操作

    目录 1.树形结构 1.1 树的概念 1.2 树的相关概念 1.3 树的表示 1.4 树在实际中的应用—表示文件系统的目录树结构 ​编辑​2.二叉树 2.1 概念 2.2 特殊二叉树  2.3 二叉树的性质 2.4 二叉树的存储结构 2.4.1 顺序存储结构(数组存储结构) 2.4.2 链式存储结构 2.5 二叉树的基本操作 2

    2024年02月12日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包