python 读写kafka

这篇具有很好参考价值的文章主要介绍了python 读写kafka。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

    1. 安装pykafka

pip install pykafka

    2. 生产者

from pykafka import KafkaClient
 
def get_kafka_producer(hosts, topics):
    client = KafkaClient(hosts=hosts)
    print(client.topics)
    topic = client.topics[topics]
    producer = topic.get_producer()
    return producer

测试

hosts = '192.168.20.203:9092,192.168.20.204:9092,192.168.20.205:9092'
topics = "test_kafka_topic"
producer = get_kafka_producer(hosts, topics)   
for i in range(10):
    msg = "test message " + str(i)
    # msg = bytes(msg, encoding='utf-8')    
    # producer.produce(msg)
    producer.produce(msg.encode())
producer.stop()

3. 消费者

def get_kafka_consumer(hosts, topics):
    client = KafkaClient(hosts=hosts)
    topic=client.topics[topics]
    consumer = topic.get_balanced_consumer(consumer_group='test_kafka_topic', auto_commit_enable=True,
zookeeper_connect='192.168.20.201:2181,192.168.20.202:2181,192.168.20.203:2181',
managed=True, consumer_timeout_ms=1000)
    # managed=True,即使用新式reblance分区方法,不需zk;managed=False则需通过zk来实现reblance      
    return consumer

测试文章来源地址https://www.toymoban.com/news/detail-807115.html

hosts = '192.168.20.203:9092,192.168.20.204:9092,192.168.20.205:9092'
topics = "test_kafka_topic"
consumer = get_kafka_consumer(hosts, topics)   
for msg in consumer:
    print(msg)
    if msg is not None:
        print(msg.offset)
        print(msg.value)

到了这里,关于python 读写kafka的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • kafka学习笔记-- 文件清理策略与高效读写数据

    本文内容来自尚硅谷B站公开教学视频,仅做个人总结、学习、复习使用,任何对此文章的引用,应当说明源出处为尚硅谷,不得用于商业用途。 如有侵权、联系速删 视频教程链接:【尚硅谷】Kafka3.x教程(从入门到调优,深入全面) Kafka 中默认的日志保存时间为 7 天,可以

    2024年01月19日
    浏览(27)
  • flink读写kafka保证端到端exactly-once

    内容: 本文主要介绍使用flink读写kafka,如何保证exactly-once Kafka端到端一致性需要注意的点: Flink任务需要开启checkpoint配置为CheckpointingMode.EXACTLY_ONCE Flink任务FlinkKafkaProducer需要指定参数Semantic.EXACTLY_ONCE Flink任务FlinkKafkaProducer配置需要配置transaction.timeout.ms,checkpoint间隔(代码指定

    2023年04月14日
    浏览(23)
  • 头歌:Python开发技术—文件和异常3( 答案+详细注释)第1关:读取文件内容+第2关:素数写入文件+第3关:输出文件目录+第4关:读写json文件

    自己敲一遍这个代码,注释我写的超级详细,一定可以明白! 内容原创,请勿转载  知识点学习参考: 1.基本读写常用指令: 一文搞懂Python文件读写 - 知乎 (zhihu.com) 2.文件遍历知识总结:  (205条消息) python遍历文件夹下的所有文件_python遍历d盘下面所有文件(排除掉文件夹)

    2024年02月05日
    浏览(161)
  • 【开发语言】C语言与Python的互操作详解

    博主未授权任何人或组织机构转载博主任何原创文章,感谢各位对原创的支持! 博主链接 本人就职于国际知名终端厂商,负责modem芯片研发。 在5G早期负责终端数据业务层、核心网相关的开发工作,目前牵头6G算力网络技术标准研究。 博客内容主要围绕:        5G/6G协议

    2024年02月10日
    浏览(38)
  • C语言文件操作(文件读写)

    本文主要介绍C语言中文件操作的相关内容(例:文件读、写等相关函数)。 在对计算机的使用中我们几乎离不开文件。例如常见的有word 文档,txt文本文件,图片文件、音频文件等。 文件是以计算机硬盘为载体存储在计算机上的信息集合。是数据源的一种,最主要的作用是保

    2024年02月02日
    浏览(29)
  • C语言中文件的读写

    不争输赢,只问对错 文章目录 一、文件的概述   二、什么是读写文件 三、文件处理的函数 1.文件的打开与关闭 2.文件的顺序读写 文件的顺序读写相关函数 scanf 和 printf 家族的对比及其区分 3.文件的随机读写       文件的随机读写函数 四、文件缓冲区 五、文件的读取结束

    2024年02月14日
    浏览(23)
  • C语言文件的读写操作

    目录 一,文件 1.文本模式,二进制模式 2.标准文件 二,文件的打开和关闭 1.文件指针 1.1文件的打开  1.2文件的关闭 三,文件的顺序读写  1.fgetc 2.fputc 3.fgets 4.fputs 5.fprintf 6.fscanf 7.fwrite 8.fread  9.表格 四,文件的随机读写 1,fseek   2,ftell 3,rewind 判断文件结束 1,feof 2,判断

    2024年02月10日
    浏览(29)
  • 如何使用C语言进行读写文件

    目录 文章目录 前言 什么是文件 文件名 文件的打开与关闭 文件的打开与关闭 文件的顺序读写 字符读写 文本行的读写 格式化输入输出 二进制文件输入输出 文件的随机读写  文件操作知识拓展  文本文件和二进制文件 文件结束判定  文件缓冲区 总结 文件操作可能看起来很

    2024年02月07日
    浏览(41)
  • 【c语言】文本文件的读写操作

    创作不易,本篇文章如果帮助到了你,还请点赞 关注支持一下♡𖥦)!! 主页专栏有更多知识,如有疑问欢迎大家指正讨论,共同进步! 🔥c语言系列专栏:c语言之路重点知识整合 🔥 给大家跳段街舞感谢支持!ጿ ኈ ቼ ዽ ጿ ኈ ቼ ዽ ጿ ኈ ቼ ዽ ጿ ኈ ቼ ዽ ጿ ኈ ቼ 本文基

    2024年01月17日
    浏览(25)
  • C语言入门教程||C语言 文件读写||C语言 预处理器

    本章我们将介绍 C 程序员如何创建、打开、关闭文本文件或二进制文件。 一个文件,无论它是文本文件还是二进制文件,都是代表了一系列的字节。C 语言不仅提供了访问顶层的函数,也提供了底层(OS)调用来处理存储设备上的文件。本章将讲解文件管理的重要调用。 您可

    2024年02月02日
    浏览(27)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包