python 自建kafka消息生成和消费小工具

这篇具有很好参考价值的文章主要介绍了python 自建kafka消息生成和消费小工具。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

要将 Kafka 的消息生产和消费转换为 API 接口,我们可以使用 Python 的 Web 框架。其中 Flask 是一个轻量级且易于使用的选择。下面是一个简单的例子,使用 Flask 创建 API 来生成和消费 Kafka 消息。

1. 安装所需的库

pip install kafka-python flask

2. 创建 Flask API

from flask import Flask, request, jsonify
from kafka import KafkaProducer, KafkaConsumer

app = Flask(__name__)

# 配置 Kafka
KAFKA_BROKER_URL = 'localhost:9092'
TOPIC = 'test_topic'
producer = KafkaProducer(bootstrap_servers=KAFKA_BROKER_URL)


@app.route('/send', methods=['POST'])
def send_message():
    message = request.json.get('message')
    if message:
        producer.send(TOPIC, value=message.encode('utf-8'))
        return jsonify({"status": "success", "message": "Message sent!"}), 200
    else:
        return jsonify({"status": "error", "message": "Message cannot be empty!"}), 400


@app.route('/receive', methods=['GET'])
def receive_message():
    consumer = KafkaConsumer(TOPIC, bootstrap_servers=KAFKA_BROKER_URL, auto_offset_reset='earliest')
    messages = []
    for message in consumer:
        messages.append(message.value.decode('utf-8'))
        if len(messages) > 5:  # 只收集最近的5条消息,可以根据需要调整
            break
    return jsonify(messages)


if __name__ == '__main__':
    app.run(debug=True, port=5000)

这个 Flask 应用程序定义了两个端点:

  • /send: 它接受 POST 请求并发送消息到 Kafka。
  • /receive: 它返回 Kafka 主题中的最近消息。

3. 使用 API

  • 发送消息:
curl -X POST http://localhost:5000/send -H "Content-Type: application/json" -d '{"message": "Hello, Kafka!"}'
  • 接收消息:
curl http://localhost:5000/receive

这只是一个简单的示例,您可能需要添加错误处理、日志记录、认证、消息序列化和反序列化等功能,以满足更复杂的需求。文章来源地址https://www.toymoban.com/news/detail-731923.html

到了这里,关于python 自建kafka消息生成和消费小工具的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Kafka消息消费流程详解

    在分布式系统中,Kafka是一种常用的消息队列系统,用于实现高可靠性的消息传递。本文将介绍Kafka消息消费的流程,并提供相应的示例代码。 Kafka消费者的流程可以概括为以下几个步骤: 创建Kafka消费者实例; 订阅一个或多个主题; 拉取消息记录; 处理消息; 提交消费位

    2024年02月09日
    浏览(72)
  • kafka入门(一):kafka消息发送与消费

    kafka的基础概念 Producer (消息生产者) 向主题发布消息的客户端应用程序称为生产者(Producer),生产者用于持续不断的向某个主题发送消息。 Consumer (消息消费者) 订阅主题消息的客户端程序称为消费者(Consumer),消费者用于处理生产者产生的消息。 Consumer Group (消费者组) 每个消费

    2024年04月12日
    浏览(44)
  • 分布式 - 消息队列Kafka:Kafka消费者和消费者组

    1. Kafka 消费者是什么? 消费者负责订阅Kafka中的主题,并且从订阅的主题上拉取消息。与其他一些消息中间件不同的是:在Kafka的消费理念中还有一层消费组的概念,每个消费者都有一个对应的消费组。当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者

    2024年02月13日
    浏览(45)
  • 分布式 - 消息队列Kafka:Kafka 消费者消费位移的提交方式

    最简单的提交方式是让消费者自动提交偏移量,自动提交 offset 的相关参数: enable.auto.commit:是否开启自动提交 offset 功能,默认为 true; auto.commit.interval.ms:自动提交 offset 的时间间隔,默认为5秒; 如果 enable.auto.commit 被设置为true,那么每过5秒,消费者就会自动提交 poll() 返

    2024年02月12日
    浏览(48)
  • kafka如何避免消息重复消费

    Kafka 避免消息重复消费通常依赖于以下策略和机制: Kafka使用Consumer Group ID来跟踪每个消费者所读取的消息。确保每个消费者都具有唯一的Group ID。如果多个消费者属于同一个Group ID,那么它们将共享消息,但每个分区的消息只能由一个消费者处理。 Kafka会记录每个消费者组消

    2024年01月15日
    浏览(44)
  • Kafka 消息发送和消费流程

    流程如下: Producer 端直接将消息发送到 Broker 中的 Leader 分区中 Broker 对应的 Leader 分区收到消息会先写入 Page Cache,定时刷盘进行持久化(顺序写入磁盘) Follower 分区拉取 Leader 分区的消息,并保持与 Leader 分区数据一致,待消息拉取完毕后需要给 Leader 分区回复 ACK 确认消息

    2024年02月12日
    浏览(38)
  • 查看kafka消息消费堆积情况

    查看主题命令 展示topic列表 描述topic 查看topic某分区偏移量最大(小)值 增加topic分区数 删除topic:慎用,只会删除zookeeper中的元数据,消息文件须手动删除 方法一: 方法二: 待验证 查看topic消费进度,必须参数为–group, 不指定–topic,默认为所有topic, 列出所有主题中的

    2024年03月13日
    浏览(81)
  • Kafka系列之消息重新消费

    需求来源,在review前人留下的屎山代码时发现如下截图所示的代码片段: 也就是说代码是空实现的。另外,从类名定义也知道需求未实现。 于是有此需求:已经消费过的消息重新消费。 调研下来,主要有以下3种可能性方案 修改偏移量,即offset,可通过脚本快速实现 新增

    2024年02月02日
    浏览(35)
  • kafka 消息日志原理 & 指定偏移量消费 & 指定时间戳消费

    Apache Kafka日志存储在物理磁盘上各种数据的集合,日志按照topic分区进行文件组织,每一个分区日志由一个或者多个文件组成。生产者发送的消息被顺序追加到日志文件的末尾。 如上图所述,Kafka主题被划分为3个分区。在Kafka中,分区是一个逻辑工作单元,其中记录被顺序附

    2024年02月15日
    浏览(46)
  • kafka 如何保证消息的顺序消费

    在Kafka分布式集群中,要保证消息的顺序消费,您可以采取以下措施: 分区策略 :Kafka的主题可以分为多个分区,每个分区内的消息是有序的。因此,首先要确保生产者将相关的消息发送到同一个分区。这可以通过生产者的分区策略来实现。默认情况下,Kafka会使用基于消息

    2024年02月06日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包