安装 pykafka文章来源地址https://www.toymoban.com/news/detail-587367.html
pip install pykafka
一、消费kafka消息
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from pykafka import KafkaClient
from pykafka.common import OffsetType
from vpn_data_handler import handler_data
bootstrap_servers = '10.*.**.**:9092'
group_id = 'test1'
class KConsumer(object):
"""kafka 消费者; 动态传参,非配置文件传入;
kafka 的消费者应该尽量和生产者保持在不同的节点上;否则容易将程序陷入死循环中;
"""
_encode = "UTF-8"
def __init__(self, topics, bootstrap_server=None, group_id=group_id, partitions=None):
""" 初始化kafka的消费者;
1. 设置默认 kafka 的主题, 节点地址, 消费者组 id(不传入的时候使用默认的值)
2. 当需要设置特定参数的时候可以直接在 kwargs 直接传入,进行解包传入原始函数;
Args:
topics: str; kafka 的消费主题;
bootstrap_server: list; kafka 的消费者地址;
group_id: str; kafka 的消费者分组 id,默认是 start_task 主要是接收并启动任务的消费者,仅此一个消费者组id;
"""
if bootstrap_server is None:
bootstrap_server = bootstrap_servers
self.client = KafkaClient(hosts=bootstrap_server)
# 选择要消费的topic
vpn_topic = self.client.topics[topics]
self.consumer = vpn_topic.get_simple_consumer(consumer_group=group_id,
consumer_timeout_ms=200,
auto_commit_enable=True,# 自动提交偏移量
auto_offset_reset=OffsetType.LATEST) #LATEST 获取当前偏移量最新消息 EARLIEST从头开始获取信息
def recv(self):
"""
接收消费中的数据
Returns:
"""
return self.consumer
def main():
"""
kafka消费队列入口
:param topic:
:return:
"""
obj = KConsumer(topics="topics_name")
while True:
for message in obj.recv():
data = eval(message.value.decode('utf-8'))
handler_data(data)
if __name__ == '__main__':
main()
二、生产者推送消息
#!/usr/bin/python # -*- coding:utf-8 -*- from pykafka import KafkaClient client = KafkaClient(hosts="10.XX0.XX0.XX4:9092") # 可接受多个client # 查看所有的topic # print(client.topics) topic = client.topics['test_78'] # 选择一个topic message = "test message2 test message2" with topic.get_sync_producer() as producer: producer.produce(bytes(message, encoding='utf8')) #python3需要编码 print(message)
文章来源:https://www.toymoban.com/news/detail-587367.html
到了这里,关于python 实时获取kafka消费队列信息的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!