python-kafka客户端封装

这篇具有很好参考价值的文章主要介绍了python-kafka客户端封装。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


前言

本文对python的kafka包做简单封装,方便kafka初学者使用。包安装:

pip install kafka-python

封装代码

kafka_helper.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import json
import traceback
from kafka import KafkaConsumer, KafkaProducer, TopicPartition
from typing import List


class KProducer:
    def __init__(self, bootstrap_servers: List, key_serializer=lambda m: json.dumps(m).encode("ascii"),
                 value_serializer=lambda m: json.dumps(m).encode("ascii"), compression_type=None):
        try:
            self.producer = KafkaProducer(
                bootstrap_servers=bootstrap_servers,
                buffer_memory=33554432,
                batch_size=1048576,
                max_request_size=1048576,
                key_serializer=key_serializer,
                value_serializer=value_serializer,
                compression_type=compression_type  # 压缩消息发送 gzip lz4 snappy zstd
            )
            print("connect success, kafka producer info {0}".format(bootstrap_servers))
        except Exception as e:
            raise Exception("connect kafka failed, {}.".format(e))

    def sync_send(self, topic: str, data):
        """
        同步发送数据
        :param data:  发送数据
        :param topic: 主题
        :return: partition, offset
        """
        try:
            future = self.producer.send(topic, data)
            record_metadata = future.get(timeout=10)  # 同步确认消费
            partition = record_metadata.partition  # 数据所在的分区
            offset = record_metadata.offset  # 数据所在分区的位置
            print("save success, partition: {}, offset: {}".format(partition, offset))
            return partition, offset
        except Exception as e:
            raise Exception("Kafka sync send failed, {}.".format(e))

    def async_send(self, topic: str, data):
        """
        异步发送数据
        :param data:  发送数据
        :param topic: 主题
        :return: None
        """
        try:
            self.producer.send(topic, data)
            print("send data:{}".format(data))
        except Exception as e:
            raise Exception("Kafka asyn send failed, {}.".format(e))

    def async_callback(self, topic: str, data):
        """
        异步发送数据 + 发送状态处理
        :param data:发送数据
        :param topic: 主题
        :return: None
        """
        try:
            for item in data:
                self.producer.send(topic, item).add_callback(self.__send_success).add_errback(self.__send_error)
                self.producer.flush()  # 批量提交
        except Exception as e:
            raise Exception("Kafka asyn send fail, {}.".format(e))

    @staticmethod
    def __send_success():
        """异步发送成功回调函数"""
        print("save success")
        return

    @staticmethod
    def __send_error():
        """异步发送错误回调函数"""
        print("save error")
        return

    def close(self):
        self.producer.close()


class KConsumer:
    def __init__(self, bootstrap_servers: List, topic: str, group_id: str, key_deserializer=None,
                 value_deserializer=None, auto_offset_reset="latest"):
        self.topic = topic
        try:
            self.consumer = KafkaConsumer(
                self.topic,
                bootstrap_servers=bootstrap_servers,
                group_id=group_id,
                enable_auto_commit=False,
                auto_commit_interval_ms=1000,
                session_timeout_ms=30000,
                max_poll_records=50,
                max_poll_interval_ms=30000,
                metadata_max_age_ms=3000,
                key_deserializer=key_deserializer,
                value_deserializer=value_deserializer,
                auto_offset_reset=auto_offset_reset
            )
            self.consumer.subscribe(topics=[self.topic])
            print("connect to kafka and subscribe topic success")
        except Exception as e:
            raise Exception("Kafka pconsumers set connect fail, {0}, {1}".format(e, traceback.print_exc()))

    def get_consumer(self):
        """
        返会可迭代consumer
        :return: consumer
        """
        return self.consumer

    def set_topic(self, topic: str):
        """
        订阅主题
        :param topic: 主题
        :return: None
        """
        self.topic = topic
        self.consumer.subscribe(topics=[self.topic])

    def get_message_by_partition_offset(self, partition, offset):
        """
        通过partition、offset获取一个消息
        :param partition: 分区
        :param offset: 游标、下标、序号
        :return: message,消息
        """
        self.consumer.unsubscribe()
        partition = TopicPartition(self.topic, partition)
        self.consumer.assign([partition])
        self.consumer.seek(partition, offset=offset)
        for message in self.consumer:
            return message

测试代码

kafka_test.py

from kafka_helper import KProducer,KConsumer
import json

def sync_send_test(bootstrap_servers,topic,json_format=True):
    value = {
        "send_type": "sync_send",
        "name": "lady_killer",
        "age": 18
    }
    if json_format:
        p = KProducer(bootstrap_servers=bootstrap_servers)
        p.sync_send(value,topic)
    else:
        p = KProducer(bootstrap_servers=bootstrap_servers,key_serializer=None,value_serializer=None)
        v = bytes('{}'.format(json.dumps(value)), 'utf-8')
        p.sync_send(v,topic)
    p.close()

def async_send_test(bootstrap_servers,topic,json_format=True):
    value = {
        "send_type": "async_send",
        "name":"lady_killer",
        "age":18
    }
    if json_format:
        p = KProducer(bootstrap_servers=bootstrap_servers)
        p.asyn_send(value,topic)
    else:
        p = KProducer(bootstrap_servers=bootstrap_servers,key_serializer=None,value_serializer=None)
        v = bytes('{}'.format(json.dumps(value)), 'utf-8')
        p.asyn_send(v,topic)
    p.close()

def consumer_test(bootstrap_servers,topic):
    c = KConsumer(bootstrap_servers=bootstrap_servers,topic=topic,group_id='test',auto_offset_reset="earliest")
    for data in c.get_consumer():
        print(type(data.value),data.value)
        print(json.loads(data.value))

def get_one_msg(bootstrap_servers,topic,partition,offset):
    c = KConsumer(bootstrap_servers=bootstrap_servers, topic=topic, group_id='test', auto_offset_reset="earliest")
    msg = c.get_message_by_partition_offset(partition,offset)
    print(msg)


if __name__ == '__main__':
    bootstrap_servers = ["kafka:9092"]
    topic = "demodata"
    # 测试生产
    sync_send_test(bootstrap_servers=bootstrap_servers,topic=topic)
    async_send_test(bootstrap_servers=bootstrap_servers,topic=topic)
    sync_send_test(bootstrap_servers=bootstrap_servers,topic=topic,json_format=False)
    async_send_test(bootstrap_servers=bootstrap_servers,topic=topic,json_format=False)
    # 测试消费
    consumer_test(bootstrap_servers=bootstrap_servers,topic=topic)
    # get_one_msg(bootstrap_servers=bootstrap_servers,topic=topic,partition=0,offset=0)

参考

Kafka入门,这一篇就够了(安装,topic,生产者,消费者)文章来源地址https://www.toymoban.com/news/detail-708303.html

到了这里,关于python-kafka客户端封装的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 探索高效 Python HTTP 客户端:HTTPX

    项目地址:https://gitcode.com/encode/httpx 在现代 Web 开发中,高效的 HTTP 请求库是不可或缺的一部分。这就是我们要向您介绍的 HTTPX 的地方。HTTPX 是一个高性能、功能丰富的 Python HTTP 客户端,旨在成为 requests 库的一个有竞争力的替代品。 HTTPX 提供了全面的异步和同步支持,包括完

    2024年04月14日
    浏览(54)
  • Python 的下一代 HTTP 客户端

    迷途小书童 读完需要 9 分钟 速读仅需 3 分钟 1 环境 windows 10 64bit python 3.8 httpx 0.23.0 2 简介 之前我们介绍过使用 requests ( https://xugaoxiang.com/2020/11/28/python-module-requests/ ) 来进行 http 操作,本篇介绍另一个功能非常类似的第三方库 httpx,它提供了同步和异步的 API,同时支持 HTTP/

    2024年02月12日
    浏览(51)
  • Redis - Python 客户端基本使用指南

    参考:python 模块Redis模块,连接Redis数据库 Python 中的 Redis 客户端库允许开发者与 Redis 数据库进行交互。这些库允许在 Python 中连接到 Redis、执行命令以读取或写入数据,并处理 Redis 数据。 以下是一些常见的 Python Redis 客户端库: redis 库:是 Python 中最常用的 Redis 客户端库之

    2024年02月08日
    浏览(44)
  • Kafka-客户端使用

    Kafka提供了两套客户端API,HighLevel API和LowLevel API。 HighLevel API 封装了kafka的运行细节,使用起来比较简单,是企业开发过程中最常用的客户端API。 LowLevel API则需要客户端自己管理Kafka的运行细节,Partition,Offset这些数据都由客户端自行管理。这层API功能更灵活,但是使用起来

    2024年02月22日
    浏览(53)
  • python 模块urllib3 HTTP 客户端库

    官网文档地址:https://urllib3.readthedocs.io/en/stable/reference/index.html 一、安装 二、基本使用 三、urllib3.request() 发送请求 四、urllib3.PoolManager() 创建和管理连接池,以便在发送多个 HTTP 请求时重用连接 http.request(method,url,body,fields,headers) 发送请求 method(字符串):指定请求的 HTTP 方

    2024年02月11日
    浏览(49)
  • Python MQTT客户端 paho-mqtt

    Python中MQTT Python有许多优秀的MQTT客户端,比较有代表性的有paho-mqtt、hbmqtt、gmqtt等,各有特色 paho-mqtt 有完善的官方文档,代码风格易于理解,目前新版本支持 MQTT 5.0 hbmqtt 使用 asyncio 库实现,可以优化网络 I/O 带来的延迟,但是代码风格不友好,文档较少,不支持 MQTT 5.0,主要后续

    2024年02月04日
    浏览(68)
  • Python启动UDP服务,监听并接收客户端数据

    可以使用Python的socket库实现UDP协议的验证,以下是一个简单的示例代码: 服务器: 客户端 以上代码创建了一个UDP socket,并绑定到本地的IP和端口8888。接着使用 recvfrom() 方法接收数据,并使用 sendto() 方法发送数据。最后,关闭socket。 可以使用两个终端分别运行该程序,并观

    2024年02月15日
    浏览(48)
  • kafka 02——三个重要的kafka客户端

    请参考下面的文章: Kafka 01——Kafka的安装及简单入门使用. AdminClient API: 允许管理和检测Topic、Broker以及其他Kafka对象。 Producer API: 发布消息到一个或多个API。 Consumer API: 订阅一个或多个Topic,并处理产生的消息。 如下: 完整的pom 关于配置,可参考官网: https://kafka.apa

    2024年02月13日
    浏览(47)
  • kafka客户端应用参数详解

    Kafka提供了非常简单的客户端API。只需要引入一个Maven依赖即可: 1、消息发送者主流程  然后可以使用Kafka提供的Producer类,快速发送消息。 ​ 整体来说,构建Producer分为三个步骤: 设置Producer核心属性  :Producer可选的属性都可以由ProducerConfig类管理。比如ProducerConfig.BOOTST

    2024年02月07日
    浏览(54)
  • Modbus协议及基于Python的ModbusTCP客户端实现

    Modbus 协议是由 Modicon 公司(现在的施耐德电气 Schneider Electric )于1979年为使用可编程逻辑控制器(PLC)通信而推出,主要建立在物理串口、以太网 TCP/IP 层之上,目前已经成为工业领域通信协议的业界标准,广泛应用在工业电子设备之间的互联。 Modbus技术文档 Modbus 是OSI模型

    2024年02月07日
    浏览(47)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包