【超级详细】熟悉Kafka的基本使用方法的实验【Windows】

这篇具有很好参考价值的文章主要介绍了【超级详细】熟悉Kafka的基本使用方法的实验【Windows】。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


前言

Kafka 是由 Apache 软件基金会开发的一个开源消息队列平台,它是一种高性能、可扩展、分布式的发布-订阅消息系统。Kafka 的架构被设计为高效、低延迟,并具有高吞吐量、持久性和可靠性。

在 Kafka 中,生产者将消息发布到主题(topic)中,消费者则从主题中消费消息,使用者可以将其看作一个 highly scalable 分布式 commit log 或者消息系统 (Messaging system),每个消息包含一个 key,一个 value 和一个额外的 timestamp。消息保留时间通过配置进行控制,当时间或空间满了的时候就根据策略来清除老数据,默认情况下老数据只保存 7 天。

特点:
1.高吞吐量:Kafka 在发布-订阅消息方面具有非常高的性能。它可以几乎实时地处理高速流入的大量数据。
实时处理:Kafka 能够处理高达数以百万计的消息,并准确地将消息排序和在群组内进行调度。
2.持久性和可靠性:与传统的消息系统不同,Kafka 具有持久性和可靠性。客户端自己提交当前偏移量,避免了可能出现的重复读取问题。
3.可扩展性:Kafka 可以在不繁琐的配置或修改信息格式等环节就能进行扩展。
4.多样化数据类型和来源:通过使用支持多种编程语言和操作系统的 API,Kafka 可以连接到许多各种来源的应用程序。

总之,Kafka 具有高性能、低时延,适合处理大规模物联网设备、日志、报警信息、传感器数据、消息等。

所以今天就来写一份关于熟悉Kafka的基本使用方法的实验,希望可以与小伙伴们一起探讨~~😉😉


一、实验平台

(1)操作系统:Windows7及以上(我用的是Windows 11)
(2)Kafka版本:kafka_2.12-2.4.0
(3)MySQL版本:8.0

二、实验内容

一、Kafka与MySQL的组合使用

1.实验要求

假设有一个学生表student,如下表所示,编写Python程序完成如下操作。
(1)读取student表的数据内容,将其转换为JSON格式,发送给Kafka
(2)从Kafka中获取JSON格式数据,打印出来

sno sname ssex sage
95001 John M 23
95002 Tom M 23

2.在MySQL中操作

(1)打开MySQL
方式一:
kafka windows,python,kafka,python,人工智能,大数据,开发语言
方式二:

可以通过 DOS 命令启动 MySQL 服务,windows+R,在搜索框中输入cmd,进去之后再输入services.msc,就进去服务系统里了,再启动就行
kafka windows,python,kafka,python,人工智能,大数据,开发语言
kafka windows,python,kafka,python,人工智能,大数据,开发语言
进去以后输入密码就可以开始执行mysql语句了
(2)创建数据库

create database school001;

(3)查看数据库

show databases;

发现数据库已经被创建完成
kafka windows,python,kafka,python,人工智能,大数据,开发语言
(4)使用该数据库

use school001;

(5)在该数据库中创建student表

create table student(sno varchar(10),sname varchar(20),ssex char(2),sage int(5));

(6)查询该数据库中的student表

show tables;

(7)向student表中插入值

insert into student values("95001","John","M",23);
insert into student values("95002","Tom","M",23);

(8)查询student表中的数据

select * from student;

查询结果:
kafka windows,python,kafka,python,人工智能,大数据,开发语言
(到这里我们的student表就创建成功了!)😊😊

3.安装Kafka

简单介绍:
Kafka 的运行需要 Java 环境的支持,因此,安装 Kafka 前需要在 Windows 操作系统中安装 JDK
访问 Kafka 官网,下载 Kafka 2.4.0的安装文件 kafka 2.12-2.4.0.1gz,解压缩到" C : \ "目录下(也可以放到D盘,不过最好放在D盘根目录下,不然后续代码容易报错,我试过)
因为 Katka 的运行依赖于 Zookeeper ,因此,还需要下载并安装 Zookeeper 。当然, Kafka 也内置了 Zookeeper 服务,因此,也可以不额外安装 Zookeeper ,直接使用内置的Zookeeper 服务。为简单起见,这里直接使用内置的Zookeeper 服务。

win+r—>输入cmd然后回车

输入命令pip install kafka-python安装python-kafka模块

查看我们安装的模块的版本信息(出现kafka-python2.0.2表示我们安装模块成功)

具体怎么安装可参考:kafka安装部署

4.使用Kafka

在实验中要用到Kafka就要先启动它的Zookeeper服务和Kafka,且在实验过程中,千万不可以将其关闭,一旦关闭,服务就会停止😡😡
在 Windows 操作系统中
打开第1个 cmd 命令行窗口,启动 Zookeeper 服务

cd D:\kafka_2.12-2.4.0(这个是你安装kafka的路径)
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.Properties

注意,执行上面的命令以后, cmd 命令行窗口中会返回一堆信息,然后停住不动,没有回到命令提示符状态。这时,不要误以为是死机,这表示 Zookeeper 服务器已经启动,正处于服务状态。所以,不要关闭这个 cmd 命令行窗口,一旦关闭, Zookeeper 服务就会停止
如图:
kafka windows,python,kafka,python,人工智能,大数据,开发语言
打开第2个 cmd 命令行窗口,然后输入如下命令启动 Kafka 服务

cd D:\kafka_2.12-2.4.0(这个是你安装kafka的路径)
.\bin\windows\kafka-server-start.bat .\config\server.Properties

同样地,执行上面的命令以后, cmd 命令行窗口中会返回一堆信息,然后停住不动,没有回到命令提示符状态。这时,不要误以为是死机,这表示 Kafka 服务器已经启动,正处于服务状态。所以,不要关闭这个 cmd 命令行窗口,一旦关闭, Kafka 服务就会停止
kafka windows,python,kafka,python,人工智能,大数据,开发语言
若执行上面的命令以后,如果启动失败,并且出现提示信息"此时不应有\QuickTime\QTSstem\QTJava.zip ",则需要把环境变量 CLASSPATH 的相关信息删除。具体方法是,

右键单击"计算机",再单击"属性"一"高级系统设置"一"环境变量",然后,找到变量 CLASSPATH ,把类似下面的信息删除:
C : Program Files (x86) QuickTime\QTSystem QTJava . zip

然后重新启动计算机,让配置修改生效。重新启动计算机以后,再次按照上面的方法启动Zookeeper和Kafka

为了测试 Kafka ,这里创建一个主题,名称为" topic_test ",其包含一个分区,只有一个副本。在第3个 cmd 命令行窗口中执行如下命令

cd D:\kafka_2.12-2.4.0(这个是你安装kafka的路径)
.\bin\windows\kafka-topics.bat -- create -- zookeeper localhost:2181-- replication -
 factor 1-- partitions 1-- topic topic_test 

kafka windows,python,kafka,python,人工智能,大数据,开发语言
可以继续执行如下命令,查看 topic _ test 是否创建成功:

.\bin\windows\kafka-topics.bat -- list -- zookeeper localhost:2181

kafka windows,python,kafka,python,人工智能,大数据,开发语言
如果创建成功,就可以在执行结果中看到 topic _ test
继续在第3个 cmd 命令行窗口中执行如下命令,创建一个生产者来产生消息

.\bin\windows\kafka-console-producer.bat -- broker-list localhost :9092 -topic topic_test 

该命令执行以后,屏幕上的光标会持续闪烁,这时,可以用键盘输入一些内容,例如:
I love Kafka
Kafka is good
新建第4个 cmd 命令行窗口,执行如下命令来消费消息

cd D:\kafka_2.12-2.4.0(这个是你安装kafka的路径)
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic_test --from-beginning 

该命令执行以后,屏幕上显示刚才输入的语句" I love Kafka “和” Kafka is good "

5.在PyCharm中操作

  1. 创建一个.py文件,写入以下代码,用于实现读取student表的数据内容,将其转换为JSON格式,发送给Kafka的功能
# 运行前先在win上启动zookeap和kafka
# 导入相关模块
from kafka import KafkaProducer
import json

# 连接kafka  json.dumps(v).encode('utf-8')将json格式的数抠转挨为字节类型,然后使用ut了-8进行编码
producer = KafkaProducer(bootstrap_servers='localhost:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8'))
# 定义一个json格式的数第,json格式以键值对形式保存数掂,每个键值对之间使用逗号隔开
data = {
    'sno': '95001',
    'sname': 'John',
    'ssex': 'M',
    'sage': 23
}
# 发送数据
producer.send('test001', data)
# 关闭资源
producer.close()

运行结果如下图所示:
kafka windows,python,kafka,python,人工智能,大数据,开发语言

  1. 创建一个.py文件,写入以下代码,用于实现从Kafka中获取JSON格式数据,打印出来的功能
# 运行前先在win上启动mysql
# 导入消费模块
import json
# 导入kafka的消费模块
from kafka import KafkaConsumer
import json
import pymysql.cursors

# 连接kafka
consumer = KafkaConsumer('test001', bootstrap_servers='localhost:9092', group_id=None, auto_offset_reset='earliest')
# 对获取的数据进行解析
for msg in consumer:
    # 转换为字符串类型
    msg1 = str(msg.value, encoding=('utf-8'))
    # 将字符串的数据加载为字典
    dict = json.loads(msg1)
    # 连接数据库
    connect = pymysql.Connect(
        host='localhost',
        port=3306,
        user='root',
        passwd='xxxxxxxx',#这是你MySQL数据库的密码
        db='school001',
        charset='utf8'
    )
    # 获取操作数抠库的对象<游标>
    cursor = connect.cursor()
    # 将数抠织存到mysqL(插入数掷)
    # 定义sql语句
    sql = "select * from student;"
    # 将数据作为参数传速给sqL,保存到hrgsql
    cursor.execute(sql)
    # 提交
    connect.commit()
    for row in cursor.fetchall():
        print("sno:%s\tsname:%s\tssex:%s\tsage:%d" % row)
    print("共查询出", cursor.rowcount, '条数据')
    connect.close()

运行结果如下图所示:
kafka windows,python,kafka,python,人工智能,大数据,开发语言

二、消费者手动提交

1.实验要求

生成一个data.json文件,内容如下:
data = [
{“name”: “Tony”, “age”: 21, “hobbies”: [“basketball”, “tennis”]},
{“name”: “Lisa”, “age”: 20, “hobbies”: [“sing”, “dance”]},
]
根据上面给出的data.json文件,执行如下操作。
(1)编写生产者程序,将JSON文件数据发送给Kafka。
(2)编写消费者程序,读取Kafka的JSON格式数据,并手动提交偏移量。

2.在PyCharm中操作

  1. 创建一个Test写入以下代码,来实现生成data.json文件的功能:
import json

data = [
    {"name": "Tony", "age": 21, "hobbies": ["basketball", "tennis"]},
    {"name": "Lisa", "age": 20, "hobbies": ["sing", "dance"]},
]

with open('../../data.json', 'w') as f:
    json.dump(data, f)
  1. 创建一个.py文件,编写生产者程序,来实现将JSON文件数据发送给Kafka的功能
# 可以使用 Python 的 json 模块读取 data.json 文件,并将数据转换为字符串后发送给 Kafka

from kafka import KafkaProducer
import json

data = [
    {
        "name": "Tony",
        "age": 21,
        "hobbies": ["basketball", "tennis"]
    },
    {
        "name": "Lisa",
        "age": 20,
        "hobbies": ["sing", "dance"]
    }
]

producer = KafkaProducer(bootstrap_servers='localhost:9092')
for item in data:
    # 将数据转换为字符串格式并发送给 Kafka 主题 test
    message = json.dumps(item).encode('utf-8')
    producer.send('test', value=message)

producer.close()

运行结果如下图所示:
kafka windows,python,kafka,python,人工智能,大数据,开发语言

  1. 创建一个.py文件,编写消费者程序,来实现读取Kafka的JSON格式数据,并手动提交偏移量的功能
# 我们可以使用 Kafka 消费者 API 进行数据消费,并在处理完每个消息后手动提交偏移量。

from kafka import KafkaConsumer
import json

# 配置 Kafka 消费者,指定主题和分组等信息
consumer = KafkaConsumer(
    'test',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    enable_auto_commit=False,  # 禁止自动提交偏移量
    group_id='my-group')

# 循环消费 Kafka 消息
for message in consumer:
    # 将传入的二进制消息内容解码为 JSON 格式的字符串
    item = json.loads(message.value.decode('utf-8'))
    print(item)

    # 手动提交偏移量,确保下次消费时从正确的位置开始
    consumer.commit()

运行结果如下图所示:
kafka windows,python,kafka,python,人工智能,大数据,开发语言

三、Kafka消费者订阅分区

1.实验要求

在命令行窗口中启动Kafka后,手动创建主题 “assign_topic” ,分区数量为2。具体命令如下:

.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic assign_topic

根据上面给出的主题,完成如下操作。
(1)编写生产者程序,以通过唯一标识符UUID作为消息,发送给主题 “ assign_topic” 。
(2)编写消费者程序1,订阅主题的分区0,只消费分区0数据。
(3)编写消费者程序2,订阅主题的分区1,只消费分区1数据。

2.在终端操作

首先要完成主题以及分区的创建才能编写程序,不然程序会报错
步骤:

  1. 使用windows+r,在弹窗中输入cmd打开终端
  2. 在终端中输入命令,创建主题和分区:
cd D:\kafka_2.12-2.4.0(这个是你安装kafka的路径)
.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic assign_topic

结果如下图(这是我之前已经创建好的结果图):kafka windows,python,kafka,python,人工智能,大数据,开发语言

3.在PyCharm中操作

  1. 创建一个.py文件,写入以下代码,用于实现编写生产者程序,以通过唯一标识符UUID作为消息,发送给主题 “ assign_topic的功能:
from kafka import KafkaProducer
import uuid

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

for i in range(5):
    message = str(uuid.uuid4()).encode('utf-8')
    producer.send('assign_topic', value=message)

producer.close()

运行结果如下图所示:
kafka windows,python,kafka,python,人工智能,大数据,开发语言

  1. 创建一个.py文件,写入以下代码,用于实现订阅主题的分区0,只消费分区0数据的功能:
from kafka import KafkaConsumer, TopicPartition

consumer = KafkaConsumer(
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    enable_auto_commit=False,
    consumer_timeout_ms=1000
)
consumer.assign([TopicPartition('assign_topic', 0)])

for message in consumer:
    print("Partition 0 - Message value: {}".format(message.value))

consumer.close()

运行结果如下图所示:
kafka windows,python,kafka,python,人工智能,大数据,开发语言

  1. 创建一个.py文件,写入以下代码,用于实现订阅主题的分区1,只消费分区1数据的功能:
from kafka import KafkaConsumer, TopicPartition

consumer = KafkaConsumer(
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    enable_auto_commit=False,
    consumer_timeout_ms=1000
)
consumer.assign([TopicPartition('assign_topic', 1)])

for message in consumer:
    print("Partition 1 - Message value: {}".format(message.value))

consumer.close()

运行结果如下图所示:
kafka windows,python,kafka,python,人工智能,大数据,开发语言

三、实验小bug

1. Kafka连接报错:kafka.errors.NoBrokersAvailable: NoBrokersAvailable 是什么原因:?
答:是因为程序运行了多次的原因
把tmp文件和logs文件里面的东西都删掉,就可以解决了
kafka windows,python,kafka,python,人工智能,大数据,开发语言
kafka windows,python,kafka,python,人工智能,大数据,开发语言
2. 为什么消费者程序1中有东西输出而消费者程序2中什么却什么也没输出?

消费者程序1和消费者程序2是对同一个主题的两个消费者应用程序。可以针对以下情况进行分析。
在主题 assign_topic 中,Kafka有多个分区,可用于并行处理消息。在这里,被消费的消息都来自此主题的第一个分区(即分区 0)。
消费者程序1使用了 .subscribe() 方法来订阅主题,这将导致消费者加入到消费组中,然后通过负载均衡策略从所有分区接收消息。因此,消费者程序1输出打印了分区 0 中的消息。
消费者程序2使用了 .assign() 方法手动分配消费者处理的分区,而且只分配了主题 assign_topic 的第一个分区(即分区 0)。但是,由于该程序没有运行足够长的时间,并且没有消费到任何未提交的偏移量,所以当应用程序终止时不会向Kafka服务器发送任何提交请求,这就可能导致在下一次启动时重复消费确认过的消息。因此,在生产环境中,请务必根据具体情况定期地提交所消费的分区的偏移量。


总结

以上就是对Kafka的基本使用方法的实验啦,有不明白的地方可以留言哦,希望能共同进步~~😀😀😀😀😀😀文章来源地址https://www.toymoban.com/news/detail-732112.html

到了这里,关于【超级详细】熟悉Kafka的基本使用方法的实验【Windows】的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • c#客户端Kafka的使用方法

    Apache Kafka是一个分布式流处理平台,最初由LinkedIn开发,现在是Apache软件基金会的顶级项目之一。Kafka能够处理大规模的实时数据流,支持高可靠性、高可扩展性、低延迟和高吞吐量。它主要用于构建实时数据管道和流式处理应用程序。 Kafka的核心概念包括:Producer(生产者)

    2024年02月12日
    浏览(60)
  • vim基本使用方法

    vim是linux上一个有多个编辑模式的编辑器。 这里主要介绍三种模式: 命令模式(Normal mode) 执行命令的模式,主要任务就是控制光标移动、复制和删除。 插入模式(Insert mode) 可以进行文字输入,编写代码模式。 末行/底行模式(last line mode) 文件保存退出,文本替换、列出

    2024年02月12日
    浏览(39)
  • docker基本使用方法

    Docker 可以让开发者打包他们的应用以及依赖包到一个轻量级、可移植的容器中,然后发布到任何流行的 Linux 机器上,也可以实现虚拟化。Docker 使您能够将应用程序与基础架构分开,从而可以快速交付软件。通过利用 Docker 的方法来快速交付,测试和部署代码,您可以大大减

    2024年02月13日
    浏览(45)
  • uCharts基本使用方法

    首先下载ucharts文件 https://gitee.com/uCharts/uCharts 下载下来看到有这些文件,小伙伴们可以先去示例项目里面看 引入u-charts.js文件,主要构建就是new uCharts和配置context,其他的就跟其他charts配置一样 可以看例子写的,也可以自己试验一波 方法写入两种方式 第一种方式 ucharts下载

    2024年02月04日
    浏览(42)
  • Wireshark基本使用方法

    目录 1、Wireshark介绍 1.1 Wireshark使用 1.2 支持的协议 2.Wireshark主要应用 3.Wireshark安装  4.Wireshark页面介绍 4.1 分组列表  4.2 分组详情  4.3 分组字节流  5.Wireshark导航 5.1 开始捕获分组 5.2 停止捕获分组 5.3 重新开始当前捕获 5.4、捕获选项 5.5 打开以保存的捕获文件 5.6 保存捕

    2024年02月13日
    浏览(37)
  • pandas库基本使用方法

    Pandas是Python中一个非常流行的数据处理库,它提供了一些强大的数据结构和数据分析工具,可以帮助我们更方便、快捷地处理数据。下面我们来介绍一下Pandas的使用方法。 1.导入Pandas库 在使用Pandas之前,需要先导入Pandas库。通常的做法是使用import语句导入Pandas库,并给它起一

    2024年02月10日
    浏览(38)
  • anaconda 创建虚拟环境、激活,使用的基本方法及安装包的基本方法

    第一步 打开Anaconda Prompt 可以看到这里是base环境。 第二步 我们现在要创建一个新的虚拟环境,名叫test,且python版本为3.8 在安装过程中会出现下面这个选项,输入y就好了 创建成功如下图所示!hiahia! 我们已经学会如何创建新的环境了!没错!我们非常棒!下面我们就看看,

    2024年03月14日
    浏览(67)
  • Loguru基本、进阶使用方法小结。

    loguru是一个开源的Python日志记录器,它提供了简单且易于使用的接口,同时具有高度的可定制性。loguru的特点包括:支持格式化日志、记录到文件或终端、支持自动清理日志、支持旋转日志等。 loguru的基本使用方法非常简单,只需要导入loguru模块,并使用logger函数创建一个日

    2024年02月16日
    浏览(36)
  • java反射的基本使用方法

    当我们使用 Java 开发时,有时需要获取某个类的信息,例如类的属性、方法和构造函数等,然后在程序运行期间动态地操作它们。Java 反射就是用来实现这个目的的一种机制。 Java 反射(Reflection)是 Java 编程语言在运行时动态获取类的信息以及动态调用对象方法的能力。它可

    2024年02月16日
    浏览(38)
  • Selenium介绍及基本使用方法

    Selenium是一个开源、免费、简单、灵活,对Web浏览器支持良好的自动化测试工具,在UI自动化、爬虫等场景下是十分实用的,能够熟练掌握并使用Selenium工具可以大大的提高效率。 Selenium简介 Selenium支持多平台、多浏览器、多语言去实现自动化测试,是一个开源和可移植的Web测

    2024年02月04日
    浏览(48)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包