python 中使用Kafka模块进行鉴权数据推送和消费

这篇具有很好参考价值的文章主要介绍了python 中使用Kafka模块进行鉴权数据推送和消费。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

最近刚好要用到kafka进行数据传输,又要鉴权,就研究了一下kafka的鉴权推送和消费,现在将代码放出来,有兴趣的可以看一下,鉴权的加密方式各有不同,所以需要注意哦!

一、生产者

生产者采用的是异步推送的形式,另外加入了计数模块,担心因为脚本推送后未回调但是脚本就停止的情况。

import datetime
from loguru import logger
import time
import random
import json
import gc
from kafka import KafkaProducer
from kafka.errors import KafkaTimeoutError
import traceback


class KProducer:
    def __init__(self, bootstrap_servers, topic, sasl_plain_username, sasl_plain_password, retry_max_times=5, type=0):
        """
        kafka 生产者
        :param bootstrap_servers: 地址
        :param topic:  topic
        """
        self.producer = self.get_kafkaProduct(bootstrap_servers, sasl_plain_username, sasl_plain_password, type)
        self.topic = topic
        self.flage = True
        self.text = ''''''
        self.retry_max_times = retry_max_times
        self.error_max_cout = 1000
        self.error_cout = 0

        self.item_count = 0
        self.total_push_count = 0
        self.total_receive_s_count = 0
        self.total_receive_e_count = 0

        self.kafka_items = {}

    def get_kafkaProduct(self, bootstrap_servers, sasl_plain_username, sasl_plain_password, type=0):
        '''
       获取kafka 生产者实例
       :param bootstrap_servers:
       :return:
       '''
        producer = None
        for i in range(5):
            try:
                if type in [1, '1']: # 判断是否要进行鉴权
                    producer = KafkaProducer(
                        bootstrap_servers=bootstrap_servers,
                        key_serializer=lambda k: json.dumps(k).encode(),
                        value_serializer=lambda v: json.dumps(v).encode(),  # value值为 json 格式化发送的内容
                        acks='all',  # acks:kafka返回的模式,1为发送成功则返回成功,‘all’为kafka全流程保存成功才返回成功。否则返回连接超时
                        retries=5,  # 重试发送次数,有时候网络出现短暂的问题的时候,会自动重发消息,这个值是需要在acks=1或all时候才有效,并且后面的max_in_flight_requests_per_connection 需要设置成1
                        request_timeout_ms=10000,  # 默认超时时间是30000ms
                        max_in_flight_requests_per_connection=1,  # 每个发送数据的网络连接对并未接收到响应的消息的最大数。默认值是5
                        batch_size=1048576,
                        # linger_ms=100,

                        sasl_mechanism="SCRAM-SHA-256",  # PLAIN 数据加密方式
                        # ssl_context=context,
                        security_protocol='SASL_PLAINTEXT',  # SASL_SSL
                        api_version=(0, 10),
                        sasl_plain_username=sasl_plain_username, # 鉴权账号
                        sasl_plain_password=sasl_plain_password, # 鉴权账号密码
                    )
                else:
                    producer = KafkaProducer(
                        bootstrap_servers=bootstrap_servers,
                        key_serializer=lambda k: json.dumps(k).encode(),
                        value_serializer=lambda v: json.dumps(v).encode(),  # value值为 json 格式化发送的内容
                        acks='all',  # acks:kafka返回的模式,1为发送成功则返回成功,‘all’为kafka全流程保存成功才返回成功。否则返回连接超时
                        retries=5,  # 重试发送次数,有时候网络出现短暂的问题的时候,会自动重发消息, 这个值是需要在acks=1或all
                                    # 时候才有效,并且后面的max_in_flight_requests_per_connection 需要设置成1

                        request_timeout_ms=10000,  # 默认超时时间是30000ms
                        batch_size=1048576,
                        linger_ms=300,
                        max_in_flight_requests_per_connection=1,  # 每个发送数据的网络连接对并未接收到响应的消息的最大数。默认值是5
                    )

                break
            except Exception as e:
                logger.error(f'【kafka_error】{bootstrap_servers}  报错: {e} kafka连接断开,重连中!')
                time.sleep(5)

        return producer

    def asyn_producer_callback(self, data, key, now_time=1):
        """
        异步发送数据 + 发送状态处理
        :param data_list:
        :param data_li:发送数据
        :return:
        """
        # gc.disable()
        try:
            if self.producer:
                key_name = key[key.find('|', 1) + 1:key.rfind('|', 1)]
                now_num = self.kafka_items.setdefault(key_name, {'push_num': 0, 'callbakc_num': 0, 'error_num': 0})[
                              'push_num'] + 1
                self.kafka_items[key_name]['push_num'] = now_num

                res = self.producer.send(self.topic, value=data, key=key).add_callback(
                    self.send_success, key_name=key_name).add_errback(self.send_error, data=data, key=key, now_time=now_time, key_name=key_name)
                self.item_count += 1
                self.total_push_count += 1
            # res.get(timeout=5)  # producer默认是异步的 如果加了get就变成了同步,也就是说要等待get到服务端返回的结果后再往下执行
            # self.producer.flush()  # 批量提交

        except KafkaTimeoutError as err:
            if self.error_cout < self.error_max_cout:
                self.error_cout += 1
                logger.error(f'【Kafka】超时 {err}')

            if now_time <= self.retry_max_times:
                now_time += 1
                return self.asyn_producer_callback(data=data, key=key, now_time=now_time)

        except Exception as err:
            if self.error_cout < self.error_max_cout:
                self.error_cout += 1
                logger.error(f'【kafka_error】key: {key} => 异步发送错误: {err} ,重新发送。')

            if now_time <= self.retry_max_times:
                now_time += 1
                return self.asyn_producer_callback(data=data, key=key, now_time=now_time)

        return ''

    def send_success(self, *args, **kwargs):
        """异步发送成功回调函数"""
        if self.item_count > 0:
            self.item_count -= 1
        # print('send_success')
        key_name = kwargs['key_name']
        self.kafka_items[key_name]['callbakc_num'] += 1
        self.total_receive_s_count += 1
        return

    def send_error(self, excp=None, *args, **kwargs):
        """异步发送错误回调函数"""
        if self.item_count > 0:
            self.item_count -= 1

        self.total_receive_e_count += 1
        try:
            data = kwargs['data']
            key = kwargs['key']
            now_time = kwargs['now_time']
            key_name = kwargs['key_name']
            self.kafka_items[key_name]['error_num'] += 1

            if now_time <= self.retry_max_times:
                now_time += 1
                time.sleep(10)
                yield self.asyn_producer_callback(data, key, now_time)

        except KafkaTimeoutError as err:
            if self.error_cout < self.error_max_cout:
                self.error_cout += 1
                logger.error(f'【kafka_error】key: {key} => 异步发送错误: {err} ,重新发送。')

            if now_time <= self.retry_max_times:
                now_time += 1
                return self.asyn_producer_callback(data=data, key=key, now_time=now_time)

        except Exception as e:
            if self.error_cout < self.error_max_cout:
                self.error_cout += 1
                logger.info(f'异步发送错误回调函数错误: {excp} | {e} | {args} |------| {kwargs}')

            if now_time <= self.retry_max_times:
                now_time += 1
                return self.asyn_producer_callback(data=data, key=key, now_time=now_time)

    def close_producer(self):
        try:
            self.producer.close()
        except Exception as e:
            logger.error(f'【kafka_error】kafka 关闭失败 原因:{e}')
            


kafka_config = {
        "bootstrap_servers": [],
        "topic": "",
        "sasl_plain_username": "", # 鉴权账号
        "sasl_plain_password": "", # 鉴权账号密码
        "type": 1
    }

topic = kafka_config.get('topic', '')
kakfka_producer = KProducer(topic=kafka_config['topic'],
                            bootstrap_servers=kafka_config['bootstrap_servers'],
                            sasl_plain_username=kafka_config['sasl_plain_username'],
                            sasl_plain_password=kafka_config['sasl_plain_password'],
                            type=kafka_config['type'])
# 发送的内容               
item_new = {
    'database': '',
    'table': '',
    'data': 'msg'
}
# 定义kafka推送数据唯一标识
keys = '|'.join([item_new['database'], item_new['table'], str(int(time.time() * 1000))])
kakfka_producer.asyn_producer_callback(item_new, key=keys)

二、消费者

消费者的代码就比较简单,只需要加入鉴权的账号密码以及加密方式即可。

from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import kafka_errors
import traceback
import json
import datetime
import time

def consumer_demo():
    config = {
        "bootstrap_servers": [""],
        "topic": "",
        "sasl_plain_username": "",
        "sasl_plain_password": "",
        "type": 1
    }

    consumer = KafkaConsumer(
        config['topic'],
        bootstrap_servers=config['bootstrap_servers'],
        sasl_mechanism="SCRAM-SHA-256",  # PLAIN
        # ssl_context=context,
        security_protocol='SASL_PLAINTEXT',  # SASL_SSL
        api_version=(0, 10),
        sasl_plain_username=config['sasl_plain_username'],
        sasl_plain_password=config['sasl_plain_password'],

    )
    
    print('等待接收....', config)
    for message in consumer:
        # if 'gzzfcj_collection_monitor' in message.key.decode():
        print("receive, key: {}, value: {}".format(
            json.loads(message.key.decode()),
            json.loads(message.value.decode())
        )
        )

if __name__ == '__main__':
    consumer_demo()

以上就是全部代码,有兴趣的可以了解一下,我也当做记录。文章来源地址https://www.toymoban.com/news/detail-539668.html

到了这里,关于python 中使用Kafka模块进行鉴权数据推送和消费的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包