python消费kafka数据

这篇具有很好参考价值的文章主要介绍了python消费kafka数据。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

上一篇文章是生产数据:python向kafka发送json数据_grfstc的博客-CSDN博客

1.安装kafka支持库

pip install kafka-python

python kafka消费,Python,kafka,分布式,python

2.创建python文件

import time
from kafka import KafkaConsumer

consumer = KafkaConsumer(
	'FaultRecordLog',
	group_id='test_id',
	bootstrap_servers=['192.168.1.214:9092'],    # 要发送的kafka主题
	auto_offset_reset='earliest',  # 有两个参数值,earliest和latest,如果省略这个参数,那么默认就是latest
)
for msg in consumer:
	print(msg)
	print(f"topic = {msg.topic}")  # topic default is string
	print(f"partition = {msg.partition}")
	print(f"value = {msg.value.decode()}")  # bytes to string
	print(f"timestamp = {msg.timestamp}")
	print("time = ", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(msg.timestamp / 1000)))

3.运行该python文件

python kafka消费,Python,kafka,分布式,python注意:

该python文件会持续消费kafka数据,如果要停止消费,需手动退出程序。

python kafka消费,Python,kafka,分布式,python

或者可以设置达到特定偏移量退出for循环来停止消费:

lastOffset = 42
for msg in consumer:
	print(msg)
	print(f"topic = {msg.topic}")  # topic default is string
	print(f"partition = {msg.partition}")
	print(f"value = {msg.value.decode()}")  # bytes to string
	print(f"timestamp = {msg.timestamp}")
	print("time = ", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(msg.timestamp / 1000)))
	if msg.offset == lastOffset - 1:
		break

 运行效果:

python kafka消费,Python,kafka,分布式,python

 文章来源地址https://www.toymoban.com/news/detail-528635.html

 

 

到了这里,关于python消费kafka数据的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 分布式 - 消息队列Kafka:Kafka消费者的分区分配策略

    Kafka 消费者负载均衡策略? Kafka 消费者分区分配策略? 1. 环境准备 创建主题 test 有5个分区,准备 3 个消费者并进行消费,观察消费分配情况。然后再停止其中一个消费者,再次观察消费分配情况。 ① 创建主题 test,该主题有5个分区,2个副本: ② 创建3个消费者CustomConsu

    2024年02月13日
    浏览(47)
  • 分布式 - 消息队列Kafka:Kafka消费者分区再均衡(Rebalance)

    01. Kafka 消费者分区再均衡是什么? 消费者群组里的消费者共享主题分区的所有权。当一个新消费者加入群组时,它将开始读取一部分原本由其他消费者读取的消息。当一个消费者被关闭或发生崩溃时,它将离开群组,原本由它读取的分区将由群组里的其他消费者读取。 分区

    2024年02月12日
    浏览(40)
  • kafka 分布式的情况下,如何保证消息的顺序消费?

    目录 一、什么是分布式 二、kafka介绍 三、消息的顺序消费 四、如何保证消息的顺序消费   分布式是指将计算任务分散到多个计算节点上进行并行处理的一种计算模型。在分布式系统中,多台计算机通过网络互联,共同协作完成任务。每个计算节点都可以独立运行,并且可以

    2024年02月10日
    浏览(56)
  • 【大数据工具】Kafka伪分布式、分布式安装和Kafka-manager工具安装与使用

    Kafka 安装包下载地址:https://archive.apache.org/dist/kafka/ 1. Kafka 伪分布式安装 1. 上传并解压 Kafka 安装包 使用 FileZilla 或其他文件传输工具上传 Kafka 安装包: kafka_2.11-0.10.0.0.tgz 解压安装包 2. 编辑配置文件 3. 拷贝并修改配置文件 分别修改 server2.properties、server3.properties 4. 创建日志

    2024年02月14日
    浏览(48)
  • Python分布式任务队列Celery

    Python celery是一个基于Python的分布式任务队列,主要用于任务的异步执行、定时调度和分布式处理。它采用了生产者/消费者模式,通过消息中间件实现多个工作者进程之间的协作。 Python celery的架构主要包括以下组件: 生产者:生产者是负责产生消息的对象。在Python celery中,

    2024年02月16日
    浏览(37)
  • 基于文心一言AI大模型,编写一段python3程序以获取华为分布式块存储REST接口的实时数据

    本文尝试基于文心一言AI大模型,编写一段python3程序以获取华为分布式块存储REST接口的实时数据。 一、用文心一言AI大模型将需求转化为样例代码 1、第一次对话:“python3写一段从rest服务器获取数据的样例代码” 同时生成了以下注解  这段代码首先定义了一个函数  get_da

    2024年02月03日
    浏览(51)
  • [Python系列] 线程、协程、进程和分布式

            我们在写脚本的时候,经常是单线程跑完了全部,毕竟自顶向下按照我们约定的方法运行下去是最规范的。但是很多时候,比如说合法地爬取一些网页信息,图片和资料啊,或者说一些合法的网络请求,读写文件之类的。如果还是单线程地one by one,那么将会影响我们

    2024年02月16日
    浏览(36)
  • Python中的分布式运行:Selenium Grid

    Selenium Grid 是 Selenium 测试框架的一个关键组件,它为测试人员提供了在多个计算机和浏览器上并行执行测试的能力。通过 Selenium Grid,我们能够更高效地进行大规模测试,并确保应用程序在不同环境中的稳定性和一致性。 我们将从以下几点深入解析Selenium Grid分布式运行的逻辑

    2024年04月12日
    浏览(34)
  • Python图像处理【23】分布式图像处理

    Python 已逐渐成为数据分析/处理领域中的主要语言,这得益于 Python 丰富的第三方库,但是,这些库的设计并未在分布式上进行扩展。 Dask 是为了原生地扩展这些 Python 库及其生态系统而开发的,它能够与现有的 Python 生态系统兼容,将其扩展到多核计算机和分布式集群中。

    2024年03月23日
    浏览(51)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包