kafka生产者和消费者(python版)

这篇具有很好参考价值的文章主要介绍了kafka生产者和消费者(python版)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

生产者

producer = KafkaProducer(bootstrap_servers=[”ip:port“])
producer.bootstrap_connected()
producer.send(self.topic_name_send,str.encode(json.dumps(message))).get()
producer.close()

消费者

消费者中的组名主要用户针对主题的偏移量进行更改,也涉及到主题中分区的问题,

consumer = KafkaConsumer(bootstrap_servers=["ip:port"], group_id="组名")
tp = TopicPartition("主题名", 0)
consumer.assign([tp])
consumer.position(tp)
# 修改用户组的偏移量为最新的
# consumer.seek_to_beginning()
last_offset = consumer.end_offsets([tp])[tp]
data_list = []
for msg in consumer:
    # print(msg.topic, msg.partition, msg.offset, msg.timestamp, msg.key, msg.value)
    # 对数据进行解析为dict
    res = json.loads(msg.value)
    data_list.append(res)
    # 用于读取数据结束的退出
    if msg.offset == last_offset - 1:
        break
consumer.commit()
        

kafka工具类

此工具类基本上拿过去就可以用

import datetime
import json
import time

from kafka import KafkaProducer, KafkaConsumer, TopicPartition
from env import enviroments,ENV
import logging


class KafkaHelper(object):
    def __init__(self, host=enviroments[ENV]['kafka']["host"], topic_send=enviroments[ENV]['kafka']['kafka_topic_send'],
                 topic_receive=enviroments[ENV]['kafka']['kafka_topic_receive'],
                 group_id = "python_test"):
        self.topic_name_send = str(topic_send)
        self.topic_name_receive = str(topic_receive)
        self.host = host
        self.group_id = group_id


    def send_msg(self, topic="test", msg="默认测试数据"):
        try:
            producer = KafkaProducer(bootstrap_servers=[self.host])
            producer.bootstrap_connected()
        except Exception as e:
            logging.error(f"发送消息时链接kafka失败,突出消息发送,失败原因:{e}")
            return
        message = {
            "topic": topic,
            "from": "python",
            "time": str(datetime.datetime.utcnow()),
            "data": msg
        }
        try:
            producer.send(self.topic_name_send,str.encode(json.dumps(message))).get()
        except Exception as e:
            logging.error(f"kafka发送数据失败,要发送的数据:{message},失败原因:{e}")
        finally:
            producer.close()

    # 消费者链接可能存在每次消费一个的情况,需要不断的创建和销毁消费者
    def get_one_data(self) -> list:
        try:
            consumer = KafkaConsumer(bootstrap_servers=[self.host], group_id=self.group_id)
            tp = TopicPartition(self.topic_name_receive, 0)
            consumer.assign([tp])
            consumer.position(tp)
        except Exception as e:
            logging.error(f"读取消息时链接kafka失败,突出消息发送,失败原因:{e}")
            return
        data_list = []
        try:
            for msg in consumer:
                # print(msg.topic, msg.partition, msg.offset, msg.timestamp, msg.key, msg.value)
                # 对数据进行解析为dict
                res = json.loads(msg.value)
                data_list.append(res)
                consumer.commit()
                consumer.close()
        except Exception as e:
            logging.error(f"读取kafka单条失败,失败原因:{e}")
        finally:
            consumer.close()
            return data_list

    def receive_msg(self)->list:
        try:
            consumer = KafkaConsumer(bootstrap_servers=[self.host], group_id=self.group_id)
            tp = TopicPartition(self.topic_name_receive, 0)
            consumer.assign([tp])
            consumer.position(tp)
            # now_offset = consumer.offsets_for_times(timestamps=int(time.time()*1000000))
            # 修改用户组的偏移量为最新的
            # consumer.seek_to_beginning()

            print(consumer.config.keys())
            last_offset = consumer.end_offsets([tp])[tp]
            print(last_offset)
        except Exception as e:
            logging.error(f"读取消息时链接kafka失败,突出消息发送,失败原因:{e}")
            return

        data_list = []
        try:
            for msg in consumer:
                # print(msg.topic, msg.partition, msg.offset, msg.timestamp, msg.key, msg.value)
                # 对数据进行解析为dict
                res = json.loads(msg.value)
                data_list.append(res)
                if msg.offset == last_offset - 1:
                    break
            consumer.commit()
        except Exception as e:
            logging.error(f"读取kafka单条失败,失败原因:{e}")
        finally:
            consumer.close()
            return data_list

疑问

  1. 当消费者链接kafka时发现topic没有未读的消息怎样退出呢,默认是在一直等待,但是我期望没有要读的消息的时候直接退出即可

本人小白一枚,有什么不对的地方欢迎大家指出,也可以加q一起讨论技术哦(不要嫌弃我菜) 1147528161文章来源地址https://www.toymoban.com/news/detail-604395.html

到了这里,关于kafka生产者和消费者(python版)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • Kafka生产者与消费者api示例

    Kafka生产者与消费者api示例

      一个正常的生产逻辑需要具备以下几个步骤 配置生产者参数及创建相应的生产者实例 构建待发送的消息 发送消息 关闭生产者实例 采用默认分区方式将消息散列的发送到各个分区当中    对于properties配置的第二种写法,相对来说不会出错,简单举例:   1.kafka的生产者可

    2024年02月07日
    浏览(15)
  • Java轻松使用Kafka生产者,消费者

    Java轻松使用Kafka生产者,消费者 一、环境说明 项目中需要下面的依赖: ( 版本自定义 ) 2. yml配置文件设置 1. 简单生产者的书写: 1. 简单消费者的书写:   注:多消费者时,需要对应kafka中配置的分区;多少的Partition就有多少个消费者,以免资源浪费

    2024年02月15日
    浏览(12)
  • Kafka:主题创建、分区修改查看、生产者、消费者

    Kafka:主题创建、分区修改查看、生产者、消费者

    1.创建主题 2.查看所有主题 3.查看详细主题 序号从0开始计算 Partition:分区数,该主题有3个分区 Replica:副本数,该主题有3个副本 Leader:副本数中的主的序号,生产消费的对象 1.修改分区数 修改的分区数量不可以小于或者等于当前主题分区的数量,否则会报错 在根目录kaf

    2024年02月11日
    浏览(16)
  • Kafka官方生产者和消费者脚本简单使用

    怎样使用Kafka官方生产者和消费者脚本进行消费生产和消费?这里假设已经下载了kafka官方文件,并已经解压. 这就可以见到测试kafka对应topic了.

    2024年02月04日
    浏览(13)
  • Kafka系列:查看Topic列表、消息消费情况、模拟生产者消费者

    Kafka系列:查看Topic列表、消息消费情况、模拟生产者消费者

    执行topic删除命令时,出现提示 这条命令其实并不执行删除动作,仅仅是在zookeeper上标记该topic要被删除而已,同时也提醒用户一定要提前打开delete.topic.enable开关,否则删除动作是不会执行的。 解决办法: a)在server.properties中设置delete.topic.enable参数为ture b)如下操作: 1.登

    2023年04月26日
    浏览(11)
  • Linux安装Kafka,创建topic、生产者、消费者

    Linux安装Kafka,创建topic、生产者、消费者

    1.创建安装目录/usr/local/kafka mkdir /usr/local/kafka 2.进入安装包目录 cd /usr/local/kafka  3.下载安装包 wget https://downloads.apache.org/kafka/3.3.1/kafka_2.12-3.3.1.tgz 4.解压安装包 tar -zxvf kafka_2.12-3.3.1.tgz 5.进入cd kafka_2.12-3.3.1目录 cd kafka_2.12-3.3.1/ 6.修改zookeeper配置 cat ./config/zookeeper.properties | grep

    2023年04月17日
    浏览(18)
  • 探究:kafka生产者/消费者与多线程安全

    探究:kafka生产者/消费者与多线程安全

    目录 1. 多线程安全 1.1. 生产者是多线程安全的么? 1.1. 消费者是多线程安全的么? 2. 消费者规避多线程安全方案 2.1. 每个线程维护一个kafkaConsumer 2.2. [单/多]kafkaConsumer实例 + 多worker线程 2.3.方案优缺点对比         Kafka生产者是 线程安全 的,可以在多个线程中共享一个

    2023年04月26日
    浏览(11)
  • Python多线程Thread——生产者消费者模型 python队列与多线程——生产者消费者模型

    下面面向对象的角度看线程 那么你可以试试看能不能用面向对象的方法实现生产者消费者模型吧。

    2024年02月09日
    浏览(15)
  • Kafka 架构深度解析:生产者(Producer)和消费者(Consumer)

    Kafka 架构深度解析:生产者(Producer)和消费者(Consumer)

    Apache Kafka 作为分布式流处理平台,其架构中的生产者和消费者是核心组件,负责实现高效的消息生产和消费。本文将深入剖析 Kafka 架构中生产者和消费者的工作原理、核心概念以及高级功能。 1 发送消息到 Kafka Kafka 生产者负责将消息发布到指定的主题。以下是一个简单的生

    2024年02月03日
    浏览(20)
  • 07、Kafka ------ 消息生产者(演示 发送消息) 和 消息消费者(演示 监听消息)

    07、Kafka ------ 消息生产者(演示 发送消息) 和 消息消费者(演示 监听消息)

    简单来说,就是一个数据项。 ▲ 消息就是 Kafka 所记录的数据节点,消息在 Kafka 中又被称为记录(record)或事件(event)。 从存储上来看,消息就是存储在分区文件(有点类似于List)中的一个数据项,消息具有 key、value、时间戳 和 可选的元数据头。 ▲ 下面是一个示例事件

    2024年01月20日
    浏览(24)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包