Python操作Kafka基础教程

这篇具有很好参考价值的文章主要介绍了Python操作Kafka基础教程。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

01 Python操作Kafka基础教程

创建ZooKeeper容器

docker run -d --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime wurstmeister/zookeeper

创建Kafka容器

语法是:

docker run  -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=[你的IP地址]:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://[你的IP地址]:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka

我的虚拟机IP是192.168.31.86,所以我的命令是:

docker run  -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=192.168.31.86:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.31.86:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka

安装可视化工具

下载UI工具:https://kafkatool.com/download2/offsetexplorer_64bit.exe

下载好以后按照默认进行安装。

Python操作Kafka基础教程,Kafka,python,kafka,开发语言

Python操作Kafka基础教程,Kafka,python,kafka,开发语言

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

Python操作Kafka基础教程,Kafka,python,kafka,开发语言

Python操作Kafka基础教程,Kafka,python,kafka,开发语言

连接Kafka

搜索软件并打开:

Python操作Kafka基础教程,Kafka,python,kafka,开发语言

Python操作Kafka基础教程,Kafka,python,kafka,开发语言

配置zookeeper:

Python操作Kafka基础教程,Kafka,python,kafka,开发语言

配置Kafka:

Python操作Kafka基础教程,Kafka,python,kafka,开发语言

点击Test测试按钮,测试是否能够连接Kafka:

Python操作Kafka基础教程,Kafka,python,kafka,开发语言

点击是,然后就成功的使用客户端连接上Kafka了。

Python操作Kafka基础教程,Kafka,python,kafka,开发语言

安装依赖

安装Python3.8

安装:文章来源地址https://www.toymoban.com/news/detail-831176.html

pip install kafka-python==2.0.2

发布和消费json数据

生产者

from kafka import KafkaProducer
import json

# 创建生产者
producer = KafkaProducer(
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    bootstrap_servers=['127.0.0.1:9092']
)

# 要提交的消息
msg_dict = {
    "operatorId": "test",  # 公交公司ID
    "terminalId": "123",  # 设备Id
    "terminalCode": "123",  # 设备编码(使用车辆ID)
    "terminalNo": "1",  # 同一车辆内terminal序号从1开始
}

# 向指定的主题发送消息
producer.send("text1", msg_dict)
producer.close()

消费者

from kafka import KafkaConsumer

# 创建消费者
consumer = KafkaConsumer('text1', bootstrap_servers='127.0.0.1:9092')

# 不停的消费数据
for msg in consumer:
    print(msg.value.decode())

发布和消费文本数据

生产者

from kafka import KafkaProducer

# 创建生产者
producer = KafkaProducer(
    value_serializer=lambda v: v.encode('utf-8'),
    bootstrap_servers=['127.0.0.1:9092']
)

# 向指定的主题发送消息
producer.send("text1", "你好")
producer.close()

消费者

from kafka import KafkaConsumer

# 创建消费者
consumer = KafkaConsumer('text1', bootstrap_servers='127.0.0.1:9092')

# 不停的消费数据
for msg in consumer:
    print(msg.value.decode())

发布和消费键值对文本数据

生产者

from kafka import KafkaProducer

# 创建生产者
producer = KafkaProducer(
    key_serializer=lambda v: v.encode('utf-8'),
    value_serializer=lambda v: v.encode('utf-8'),
    bootstrap_servers=['127.0.0.1:9092']
)

# 向指定的主题发送消息
producer.send("text1", key="msg", value="你好")
producer.close()

消费者

from kafka import KafkaConsumer

# 创建消费者
consumer = KafkaConsumer('text1', bootstrap_servers='127.0.0.1:9092')

# 不停的消费数据
for msg in consumer:
    print("key=", msg.key.decode())
    print("value=", msg.value.decode())

发布和消费键值对JSON数据

生产者

from kafka import KafkaProducer
import json

# 创建生产者
producer = KafkaProducer(
    key_serializer=lambda v: json.dumps(v).encode('utf-8'),
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    bootstrap_servers=['127.0.0.1:9092']
)

# 向指定的主题发送消息
key = {"a": 1}
value = {"b": 2}
producer.send("text1", key=key, value=value)
producer.close()

消费者

from kafka import KafkaConsumer

# 创建消费者
consumer = KafkaConsumer('text1', bootstrap_servers='127.0.0.1:9092')

# 不停的消费数据
for msg in consumer:
    print("key=", msg.key.decode())
    print("value=", msg.value.decode())

发布和消费压缩文本数据

生产者

from kafka import KafkaProducer

# 创建生产者
producer = KafkaProducer(
    value_serializer=lambda v: v.encode('utf-8'),
    bootstrap_servers=['127.0.0.1:9092'],
    compression_type='gzip',  # 通过此参数声明要压缩数据传输
)

# 向指定的主题发送消息
producer.send("text1", "你好")
producer.close()

消费者

from kafka import KafkaConsumer

# 创建消费者
consumer = KafkaConsumer('text1', bootstrap_servers='127.0.0.1:9092')

# 不停的消费数据
for msg in consumer:
    print(msg.value.decode())

同时消费多个主题

生产者

from kafka import KafkaProducer

# 创建生产者
producer = KafkaProducer(
    value_serializer=lambda v: v.encode('utf-8'),
    bootstrap_servers=['127.0.0.1:9092']
)

# 向指定的主题发送消息
producer.send("text1", "你好")
producer.send("text2", "你好")

producer.send("text1", "你好1")
producer.send("text2", "你好1")

producer.send("text1", "你好2")
producer.send("text2", "你好2")


producer.close()

消费者

from kafka import KafkaConsumer

# 创建消费者
consumer = KafkaConsumer(bootstrap_servers='127.0.0.1:9092')

# 不停的消费数据
consumer.subscribe(["text1", "text2"])
for msg in consumer:
    print(msg)
    print(msg.topic)
    print(msg.value.decode())

获取发布结果

生产者

from kafka import KafkaProducer

# 创建生产者
producer = KafkaProducer(
    value_serializer=lambda v: v.encode('utf-8'),
    bootstrap_servers=['127.0.0.1:9092']
)

# 向指定的主题发送消息
feature = producer.send("text1", "你好")

# 会阻塞,直到发送成功
print(feature.get(timeout=60))


producer.close()

消费者

from kafka import KafkaConsumer

# 创建消费者
consumer = KafkaConsumer("text1", bootstrap_servers='127.0.0.1:9092')

# 不停的消费数据
for msg in consumer:
    print(msg.topic)
    print(msg.value.decode())

到了这里,关于Python操作Kafka基础教程的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 大数据系列教程之 Kafka基础

    kafka概述 一、kafka概述 1.1 定义 1.2 消息队列 1.2.1 传统消息队列的应用场景 1.2.2 消息队列的两种形式 1.3 Kafka 基础架构 二、kafka安装部署 2.1安装部署 2.1.1.jar包下载 2.1.2.解压到指定的文件夹下 2.1.3.创建两个文件夹以供后续使用 2.1.4. 修改配置文件 (1)修改zookeeper.properties 文件

    2024年02月10日
    浏览(29)
  • Go操作各大消息队列教程(RabbitMQ、Kafka)

    1.1 概念 ①基本名词 当前市面上mq的产品很多,比如RabbitMQ、Kafka、ActiveMQ、ZeroMQ和阿里巴巴捐献给Apache的RocketMQ。甚至连redis这种NoSQL都支持MQ的功能。 Broker:表示消息队列服务实体 Virtual Host:虚拟主机。标识一批交换机、消息队列和相关对象。vhost是AMQP概念的基础,必须在链

    2024年02月11日
    浏览(36)
  • 【开发语言】C语言与Python的互操作详解

    博主未授权任何人或组织机构转载博主任何原创文章,感谢各位对原创的支持! 博主链接 本人就职于国际知名终端厂商,负责modem芯片研发。 在5G早期负责终端数据业务层、核心网相关的开发工作,目前牵头6G算力网络技术标准研究。 博客内容主要围绕:        5G/6G协议

    2024年02月10日
    浏览(66)
  • Python基础教程--3.1文件的相关操作

    在 Python 中,文件操作是一个非常重要的部分。本文将介绍如何在 Python 中打开、读写、删除和重命名文件。 在 Python 中,可以使用 open() 函数打开文件。该函数需要两个参数:文件路径和打开模式。 例如,如果要打开名为 example.txt 的文件,可以使用以下代码,r 表示读取模式

    2023年04月08日
    浏览(37)
  • 自动化理论基础(2)—开发语言之Python

    一、知识汇总 掌握 Python 编程语言需要具备一定的基础知识和技能,特别是对于从事自动化测试等领域的工程师。以下是掌握 Python 的一些关键方面: 基本语法: 理解 Python 的基本语法,包括变量、数据类型、运算符、条件语句、循环语句等。 数据结构: 熟悉并能够使用

    2024年01月18日
    浏览(61)
  • kafka--python

    Producer:即生产者,消息的产生者,是消息的入口; Consumer:消费者,即消息的消费方,是消息的出口; Broker:中间代理,即一个broker就是一个server。每个kafka集群内的broker都有一个不重复的编号,如图中的broker-0、broker-1等…… Topic(主题):可以理解为消息的分类,kafka的

    2024年04月11日
    浏览(35)
  • python 读写kafka

    测试 测试

    2024年01月20日
    浏览(44)
  • python消费kafka数据

    上一篇文章是生产数据:python向kafka发送json数据_grfstc的博客-CSDN博客 1.安装kafka支持库 2.创建python文件 3.运行该python文件 注意: 该python文件会持续消费kafka数据,如果要停止消费,需手动退出程序。 或者可以设置达到特定偏移量退出for循环来停止消费:  运行效果:      

    2024年02月12日
    浏览(67)
  • 学习如何使用 Python 连接 MongoDB: PyMongo 安装和基础操作教程

    Python 可以用于数据库应用程序。最流行的 NoSQL 数据库之一是 MongoDB MongoDB 将数据存储在类似 JSON 的文档中,使数据库非常灵活和可扩展。 您可以在 MongoDB 官网 上下载免费的 MongoDB 数据库 Python 需要一个 MongoDB 驱动程序来访问 MongoDB 数据库。我将使用 MongoDB 驱动程序 PyMongo 建

    2024年02月02日
    浏览(37)
  • Python消费Kafka与优化

         之前使用kafka-python库进行消费数据处理业务逻辑,但是没有深入里面的一些细节,导致会遇到一些坑。正常普通我们常见的一个消费者代码:(假设topic的分区数是20个)    这个代码本身没什么问题,消费数据都正常。但是消费能力随着数据写入量的增加,消费者消费能力

    2024年02月07日
    浏览(35)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包