Kafka Producer/Consumer 关系解释及测试demo

这篇具有很好参考价值的文章主要介绍了Kafka Producer/Consumer 关系解释及测试demo。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Producer/Consumer

Kafka的生产者(Producer)和消费者(Consumer)的关系,可以通过一个餐厅的例子来形象地说明。

1. 餐厅的故事

想象一个忙碌的餐厅,这里有:

  • 厨师(Producers):负责准备美味的菜肴。
  • 服务台(Kafka Topic):菜肴准备好后,厨师会将它们放到服务台上,服务台有多个部分,每部分代表一个不同类型的菜(即Kafka中的不同Partition)。
  • 服务员(Consumers):负责从服务台上取走菜肴,并将它们送到顾客手中。

在这个餐厅中,有时候会有特别多的订单,厨师需要快速高效地准备菜肴。每当一道菜准备好,他们就会把它放到对应的部分在服务台上。服务台非常长,可以容纳很多菜肴,让不同的服务员能够同时服务多个顾客,提高效率。

2. Kafka的工作方式

  • Producers(厨师):在Kafka中,生产者的角色是发布消息到Topic中。就像厨师准备好菜肴后,会将它们放到服务台的对应部分。
  • Kafka Topic(服务台):Topic是消息的分类,可以细分为多个Partitions(服务台的多个部分),这样可以提高并行处理的能力。每个Partition都是一个独立的队列。
  • Consumers(服务员):消费者从Topic中读取消息。如果有多个消费者在同一个Consumer Group中,它们可以像一队服务员那样协作,每个人负责从服务台的一部分取菜,这样可以更快地服务所有顾客。每个消费者负责读取特定Partition中的消息,确保每条消息都能被及时处理。

3. 生动的场景

假设一天晚上,餐厅接到了一个大型宴会的预订,需要同时准备多道菜。这时,厨师们(Producers)开始忙碌起来,每准备好一道菜,就会放到服务台(Topic)的指定位置(Partition)。服务员们(Consumers)各自负责一部分服务台,快速地将菜肴送到顾客手中。

在这个过程中,如果某一部分的菜准备得特别快,服务台上的这一部分就会堆积更多的菜肴。负责这一部分的服务员需要加快速度,以确保所有的菜肴都能及时送出。这就像在Kafka中,如果某个Partition的消息积压,负责这个Partition的消费者就需要更快地处理消息,以防止延迟。

通过这个例子,我们可以看到,Kafka的Producer和Consumer之间是如何通过Topic(服务台)和Partition(服务台的不同部分)协作的,以实现高效、可靠的消息处理。文章来源地址https://www.toymoban.com/news/detail-826485.html

4. 测试Demo

4.1 KafkaProducer

from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import kafka_errors
import traceback
import json
import time

import logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')

def process():
    # Kafka配置,需自行修改
    bootstrap_servers = ['ip:port']
    producer_topic = 'XXX_topic'

    # Kafka生产者
    producer = KafkaProducer(
                bootstrap_servers=bootstrap_servers,
                value_serializer=lambda m: json.dumps(m).encode('utf-8'),
                api_version=(1,0,0)
            )

    data = {
        "task_id": 1,
        "image_path": "XXX",
        "video_path": "XXX",
        "guidence_text": "XXX",
    }

    # Kafka请求监听
    try:
        res = data

        # 发送结果到Kafka
        producer.send(producer_topic, res)
        logging.info(f"send data to {producer_topic}")
        time.sleep(3)
    except Exception as e:
        # 记录错误日志
        logging.error(f"Error processing kafka request: {e}")


if __name__ == "__main__":
    process()

4.2 KafkaConsumer

from kafka import KafkaConsumer
import json
import logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')

def consume_messages():
    # Kafka配置
    bootstrap_servers = ['ip:port']
    consumer_topic = 'XXX'
    consumer_group = 'XXX'

    # Kafka消费者
    consumer = KafkaConsumer(
        consumer_topic,
        bootstrap_servers=bootstrap_servers,
        group_id=consumer_group,
        # auto_offset_reset='earliest',  # 从最早的消息开始读取
        auto_offset_reset= "latest",
        value_deserializer=lambda m: json.loads(m.decode('utf-8'))  # 解码JSON格式的消息
    )

    logging.info(f"Started consuming messages from {consumer_topic}")

    # 消费消息
    try:
        for message in consumer:
            msg = message.value
            logging.info(f"Received message: {msg}")
            print(f"msg:{msg}")

    except KeyboardInterrupt:
        logging.info("Stopping consumer...")
    except Exception as e:
        logging.error(f"Error while consuming messages: {e}")
    finally:
        consumer.close()

if __name__ == "__main__":
    consume_messages()

到了这里,关于Kafka Producer/Consumer 关系解释及测试demo的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Kafka_02_Producer详解

    Producer (生产者): 生产并发送消息到Broker(推送) Producer是 多线程安全 的(建议通过池化以提高性能) Producer实例后可发送多条消息(可对应多个ProducerRecord) // 0.9之后的版本是基于Java实现(之前是Scala实现) Producer客户端发送消息大致逻辑: 配置Producer客户端参数并创建该Producer实例 构

    2024年02月01日
    浏览(33)
  • 图解系列 图解Kafka之Producer

    开局一张图,其他全靠吹 发送消息流程如下 : 指定bootstrap.servers,地址的格式为 host:port 。它会连接bootstrap.servers参数指定的所有Broker,Producer启动时会发起与这些Broker的连接。因此,如果你为这个参数指定了1000个Broker连接信息,那么很遗憾,你的Producer启动时会首先创建与

    2024年02月09日
    浏览(24)
  • Kafka中的producer拦截器

    在Kafka中,拦截器一共有两种。分别是生产者端的和消费者端的。本文介绍生产者端的拦截器 Kafka Producer拦截器(Interceptor)主要用于实现clients端的定制化控制逻辑。对于Producer而言,Interceptor使得用户在消息发送前以及Producer回调逻辑前有机会对消息做一些定制化需求,比如

    2024年02月16日
    浏览(26)
  • 谈谈 Kafka 的幂等性 Producer

    使用消息队列,我们肯定希望不丢消息,也就是消息队列组件,需要保证消息的可靠交付。消息交付的可靠性保障,有以下三种承诺: 最多一次(at most once):消息可能会丢失,但绝不会被重复发送。 至少一次(at least once):消息不会丢失,但有可能被重复发送。 精确一次

    2024年02月14日
    浏览(25)
  • 【Kafka】Kafka consumer lag 为负数

    最近对Kafka 集群部署了 Kafka_exporter 监控,并集成了 granfana 图标展示。 发现 Consumer Group Lag 有时候为负数。 于是进行一番查询,并总结整理下。 从下图可以看出, consumer group 值有时候出现负数的情况。 消息过期了(超过默认7天),已经被清理掉了,这时候 topic 最新的 end

    2024年02月13日
    浏览(42)
  • Failed to construct kafka producer

    问题重述: org.apache.kafka.common.KafkaException: Failed to construct kafka producer Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers Kafka连接不上 解决办法: 将kafka信息输入到hosts文件中 文件地址 使用Notepad++打开,输入ip地址和kafka名称

    2024年01月18日
    浏览(27)
  • 图解Kafka Producer常用性能优化配置参数

    bootstrap.servers:Kafka broker服务器地址列表, , 分开,可不必写全,Kafka内部有自动感知Kafka broker的机制 client.dns.lookup:客户端寻找bootstrap地址的方式,支持两种方式: resolve_canonical_bootstrap_servers_only:依据bootstrap.servers提供的主机名(hostname),根据主机上的名称服务返回其IP地址

    2024年02月03日
    浏览(27)
  • Kafka中Consumer源码解读

    本课程的核心技术点如下: 1、consumer初始化 2、如何选举Consumer Leader 3、Consumer Leader是如何制定分区方案 4、Consumer如何拉取数据 5、Consumer的自动偏移量提交 从KafkaConsumer的构造方法出发,我们跟踪到核心实现方法 这个方法的前面代码部分都是一些配置,我们分析源码要抓核心

    2024年02月09日
    浏览(27)
  • 简单聊聊Kafka的Consumer

    我们知道消息队列一般有两种实现方式,(1)Push(推模式) (2)Pull(拉模式),那么 Kafka Consumer 究竟采用哪种方式进行消费的呢? 其实 Kafka Consumer 采用的是主动拉取 Broker 数据进行消费的即 Pull 模式 。这两种方式各有优劣,我们来分析一下: 1)、为什么不采用Push模式?**如果是选择

    2024年01月15日
    浏览(27)
  • 大数据学习:kafka-producer源码分析

    kafka-1.0.1源码下载地址 2.1 此时我们先撇开源码不说,先来画个原理图。 2.1.1 丢进缓冲区前的操作 首先我们现在是初始化了一个 KafkaProducer 对吧。然后会有一个 ProducerInterceptors ,看这个英文像是拦截器,它会把我们的消息根据一定的规则去过滤掉。但是这个东西其实作用不大

    2024年02月10日
    浏览(25)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包