【Kafka】概述与集群部署

这篇具有很好参考价值的文章主要介绍了【Kafka】概述与集群部署。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Kafka概述

定义

kafka是一种分布式的,基于发布/订阅的消息队列 (MessageQueue)。它可以处理消费者在网站中的所有动作流数据。

Kafka是一个开源的分布式事件流平台(Event StreamingPlatform),被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。(既想处理消息队列,又想处理数据)

【Kafka】概述与集群部署

应用场景

缓冲/削峰

有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。

【Kafka】概述与集群部署
解耦

允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。

【Kafka】概述与集群部署
异步通信

允许用户把一个消息放入队列,但并不立即处理它,然后在需要的时候再去处理它们。

【Kafka】概述与集群部署

应用模式

点对点模式

消费者主动拉去数据,消息收到后清除消息

【Kafka】概述与集群部署
发布/订阅模式

• 可以有多个topic主题(浏览、点赞、收藏、评论等)
• 消费者消费数据之后,不删除数据
• 每个消费者相互独立,都可以消费到数据

【Kafka】概述与集群部署

基础架构

【Kafka】概述与集群部署
【Kafka】概述与集群部署

(1)Producer:消息生产者,就是向Kafka broker发消息的客户端。
(2)Consumer:消息消费者,向Kafka broker取消息的客户端。
(3)Consumer Group(CG):消费者组,由多个consumer组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
(4)Broker:一台Kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
(5)Topic:可以理解为一个队列,生产者和消费者面向的都是一个topic。
(6)Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。
(7)Replica:副本。一个topic的每个分区都有若干个副本,一个Leader和若干个Follower。
(8)Leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是Leader。
(9)Follower:每个分区多个副本中的“从”,实时从Leader中同步数据,保持和Leader数据的同步。Leader发生故障时,某个Follower会成为新的Leader。

Kafka集群部署

集群规划

hadoop102 hadoop103 hadoop104
zk zk zk
kafka kafka kafka

下载解压

官网地址;https://kafka.apache.org/downloads

# 下载

cd /opt/module

wget https://downloads.apache.org/kafka/3.4.0/kafka_2.12-3.4.0.tgz

# 解压

tar -zxvf kafka_2.12-3.4.0.tgz -C /opt/module/

# 改名

mv kafka_2.12-3.4.0/ kafka

【Kafka】概述与集群部署

修改配置文件

路径:config/server.properties

  • broker.id=0
  • log.dirs=/opt/module/kafka/datas
  • zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka
#broker的全局唯一编号,不能重复,只能是数字。
broker.id=0
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的线程数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka运行日志(数据)存放的路径,路径不需要提前创建,kafka自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
log.dirs=/opt/module/kafka/datas
#topic在当前broker上的分区个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
# 每个topic创建时的副本数,默认时1个副本
offsets.topic.replication.factor=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#每个segment文件的大小,默认最大1G
log.segment.bytes=1073741824
# 检查过期数据的时间,默认5分钟检查一次是否数据过期
log.retention.check.interval.ms=300000
#配置连接Zookeeper集群地址(在zk根目录下创建/kafka,方便管理)
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka

分发安装包

这一步的作用是让另外几台服务器也都有 kafka/ 这套文件

  • 命令:xsync kafka/
  • 下面是xsync.sh脚本的内容
#!/bin/bash

#1. 判断参数个数
if [ $# -lt 1 ]
then
    echo Not Enough Arguement!
    exit;
fi

#2. 遍历集群所有机器
for host in hadoop102 hadoop103 hadoop104
do
    echo ====================  $host  ====================
    #3. 遍历所有目录,挨个发送

    for file in $@
    do
        #4. 判断文件是否存在
        if [ -e $file ]
            then
                #5. 获取父目录
                pdir=$(cd -P $(dirname $file); pwd)

                #6. 获取当前文件的名称
                fname=$(basename $file)
                ssh $host "mkdir -p $pdir"
                rsync -av $pdir/$fname $host:$pdir
            else
                echo $file does not exists!
        fi
    done
done

hadoop103、hadoop104修改配置文件

# broker.id不得重复,整个集群中唯一
# hadoop103对应的
broker.id=1

# hadoop104对应的
broker.id=2

配置环境变量

路径:vim /etc/profile.d/my_env.sh

注意:集群内的机子都需要配一遍

# KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin

刷新一下环境变量

source /etc/profile

启动集群

先启动Zookeeper集群
kafka/zk.sh start


vim kafka/zk.sh
#!/bin/bash

case $1 in
"start"){
	for i in hadoop102 hadoop103 hadoop104
	do
		echo  ------------- zookeeper $i 启动 ------------
		ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh start"
	done
}
;;
"stop"){
	for i in hadoop102 hadoop103 hadoop104
	do
		echo  ------------- zookeeper $i 停止 ------------
		ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh stop"
	done
}
;;
"status"){
	for i in hadoop102 hadoop103 hadoop104
	do
		echo  ------------- zookeeper $i 状态 ------------
		ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh status"
	done
}
;;
esac

然后启动Kafka
bin/kafka-server-start.sh -daemon config/server.properties
bin/kafka-server-start.sh -daemon config/server.properties
bin/kafka-server-start.sh -daemon config/server.properties

关闭集群

bin/kafka-server-stop.sh

集群启停脚本

脚本编写

vim kf.sh

#! /bin/bash

case $1 in
"start"){
    for i in hadoop102 hadoop103 hadoop104
    do
        echo " --------启动 $i Kafka-------"
        ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"
    done
};;
"stop"){
    for i in hadoop102 hadoop103 hadoop104
    do
        echo " --------停止 $i Kafka-------"
        ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh "
    done
};;
esac

添加执行权限

chmod +x kf.sh

启动集群脚本命令

kf.sh start

停止集群脚本命令

kf.sh stop

注意: 停止Kafka集群时,一定要等Kafka所有节点进程全部停止后再停止Zookeeper集群。因为Zookeeper集群当中记录着Kafka集群相关信息,Zookeeper集群一旦先停止,Kafka集群就没有办法再获取停止进程的信息,只能手动杀死Kafka进程了。

Docker启动Kafka集群

docker-compose.yml编写

version: '3'
services:
  li-zookeeper:
    image: wurstmeister/zookeeper:latest
    ports:
      - "7181:2181"
    networks:
      - li-kafka-net

  li-kafka-1:
    image: wurstmeister/kafka
    environment:
      KAFKA_LISTENERS: PLAINTEXT://:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://139.196.169.148:7091
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: li-zookeeper:2181
    ports:
      - "7091:9092"
    networks:
      - li-kafka-net

  li-kafka-2:
    image: wurstmeister/kafka
    environment:
      KAFKA_LISTENERS: PLAINTEXT://:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://139.196.169.148:7092
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: li-zookeeper:2181
    ports:
      - "7092:9092"
    networks:
      - li-kafka-net

  li-kafka-3:
    image: wurstmeister/kafka
    environment:
      KAFKA_LISTENERS: PLAINTEXT://:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://139.196.169.148:7093
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: li-zookeeper:2181
    ports:
      - "7093:9092"
    networks:
      - li-kafka-net

  li-kafka-map:
    image: dushixiang/kafka-map:latest
    environment:
      KAFKA_MAP_KAFKA_SERVERS: li-kafka-1:9092,li-kafka-2:9092,li-kafka-3:9092
      KAFKA_MAP_USERNAME: admin
      KAFKA_MAP_PASSWORD: admin
    ports:
      - "8080:8080"
    networks:
      - li-kafka-net

networks:
  li-kafka-net:
    driver: bridge

启动compose

docker-compose up -d

命令行验证

docker exec -it kafka-server bash
cd /opt/bitnami/kafka

列出所有topics
kafka-topics.sh --bootstrap-server 139.196.169.148:7091 --list

创建一个topic
kafka-topics.sh --bootstrap-server 139.196.169.148:7092 --create --partitions 1 --replication-factor 3 --topic first

生产者
kafka-console-producer.sh --bootstrap-server 139.196.169.148:7091  --topic first 

消费者
kafka-console-consumer.sh --bootstrap-server 139.196.169.148:7092 --from-beginning --topic first

Python验证

  • 安装包

    pip install kafka-python
    
  • 生产者

    from kafka import KafkaProducer
    from kafka.errors import KafkaError
    
    # Kafka 集群地址
    bootstrap_servers = ['139.196.169.148:7091', '139.196.169.148:7092', '139.196.169.148:7093']
    
    # Kafka 主题名称
    topic = 'first'
    
    # 创建 Kafka 生产者
    producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
    
    
    # 发送消息到 Kafka 主题
    def send_message(message):
        try:
            producer.send(topic, message.encode('utf-8'))
            producer.flush()
            print('Message sent successfully:', message)
        except KafkaError as e:
            print('Failed to send message:', e)
    
    
    # 测试发送消息
    send_message('Hello, Kafka!')
    
  • 消费者文章来源地址https://www.toymoban.com/news/detail-441054.html

    from kafka import KafkaConsumer
    
    # Kafka 集群地址
    bootstrap_servers = ['139.196.169.148:7091', '139.196.169.148:7092', '139.196.169.148:7093']
    
    # Kafka 主题名称
    topic = 'first'
    
    # 创建 Kafka 消费者
    consumer = KafkaConsumer(
        topic,
        bootstrap_servers=bootstrap_servers,
        auto_offset_reset='earliest',  # 从最早的消息开始消费
        enable_auto_commit=True,  # 自动提交消费位移
        group_id='my-group')  # 消费者组名称
    
    
    # 从 Kafka 主题消费消息
    def consume_message():
        for message in consumer:
            print('Message received:', message.value.decode('utf-8'))
    
    
    # 测试消费消息
    consume_message()
    

到了这里,关于【Kafka】概述与集群部署的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • ansible部署kafka集群

    其中一台作为Ansible的母机并命名为ansible,另外三台云主机命名为node1、node2、node3,通过附件中的/ansible/ansible.tar.gz软件包在ansible节点安装Ansible服务;使用这一台母机,编写Ansible脚本(在/root目录下创建example目录作为Ansible工作目录,部署的入口文件命名为cscc_install.yaml),编

    2024年01月22日
    浏览(42)
  • Kafka 集群部署

    目录 1、环境准备 2、搭建ZooKeeper集群 配置文件 节点标记 环境变量 启动集群 数据同步测试 故障测试 3、搭建 Kafka 集群 配置文件 环境变量 配置其他机器 启动服务 4、集群测试 创建 Topic 显示 Topic 配置 创建 Producer 创建consumer 删除Topic 查看Zookeeper元数据 一个Broker就是一个ka

    2024年01月20日
    浏览(27)
  • Kafka集群部署

      Kafka是一个高吞吐量、基于ZooKeeper(ZooKeeper维护Kafka的broker信息)的分布式发布订阅信息系统,它可以处理消费者在网站中的所有动作(网页浏览,搜索和其他用户的行动)流数据。通常情况下,使用Kafka构建系统或应用程序之间的数据管道,用来转换或响应实时数据,使

    2024年02月14日
    浏览(32)
  • Zookeeper、Kafka集群与Filebeat+Kafka+ELK架构、部署实例

    Zookeeper是一个开源的分布式的,为分布式框架提供协调服务的Apache项目。 Zookeeper:一个领导者(Leader),多个跟随者(Follower)组成的集群。 Zookeeper集群中只要有半数以上节点存活,Zookeeper集群就能正常服务。所以Zookeeper适合安装奇数台服务器。 全局数据一致:每个Server保

    2024年02月08日
    浏览(48)
  • Kafka集群安装部署(自带zookeeper)

    •Kafka 将消息以 topic 为单位进行归纳。 • 将向 Kafka topic 发布消息的程序成为 producers. • 将预订 topics 并消费消息的程序成为 consumer. •Kafka 以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个 broker. producers 通过网络将消息发送到 Kafka 集群,集群向消费者提

    2024年02月10日
    浏览(49)
  • Linux实战——Kafka集群安装部署

    Kafka是一款 分布式的、去中心化的、高吞吐低延迟、订阅模式 的消息队列系统。 同RabbitMQ一样,Kafka也是消息队列。不过RabbitMQ多用于后端系统,因其更加专注于消息的延迟和容错。 Kafka多用于大数据体系,因其更加专注于数据的吞吐能力。 Kafka多数都是运行在分布式(集群

    2023年04月09日
    浏览(54)
  • Kafka入门介绍+集群部署+简单使用

    官网:https://kafka.apache.org/ 中文文档:https://kafka1x.apachecn.org/intro.html Kafka是一个开源的分布式流处理平台 主要有三个关键功能 发布订阅事件流(可以用作消息队列) 分布式持久化存储事件流(可以用作数据处理系统) 可以在事件发生时处理或回顾性的处理 整体架构图如下:

    2024年04月27日
    浏览(38)
  • 手动部署Kraft模式Kafka集群

    IP地址 Hostname Release Kafka-Version 172.29.145.157 iamdemo1 Centos7.9 kafka_2.12-3.5.1 172.29.145.182 iamdemo2 Centos7.9 kafka_2.12-3.5.1 172.29.145.183 iamdemo3 Centos7.9 kafka_2.12-3.5.1 下载安装包 kafka安装包官网下载 下载完成后上传到服务器/opt目录下解压 生成集群随机uuid 配置kafka集群的kraft模式参数 使用集群

    2024年02月05日
    浏览(40)
  • kafka3.6.0集群部署

    环境准备 机器环境 系统 主机名 IP地址 centos7.9 kafka01 192.168.200.51 centos7.9 kafka02 192.168.200.52 centos7.9 kafka03 192.168.200.53 所需软件 hosts设置 java环境设置 zookeeper安装部署 创建软件安装目录 解压安装 修改配置 分发软件 kafka02与kafka03软链接 kafka02与kafka03修改myid 防火墙放行端口 设置

    2024年02月04日
    浏览(34)
  • kafka的原理及集群部署详解

    消息队列分类 点对点 组成:消息队列(Queue)、发送者(Sender)、接收者(Receiver) 特点:一个生产者生产的消息只能被一个接受者接收,消息一旦被消费,消息就不在消息队列中了 发布/订阅 组成:消息队列(Queue)、发布者(Publisher)、订阅者(Subscriber)、主题(Topic)

    2024年02月02日
    浏览(33)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包