Kafka 监听器详解

这篇具有很好参考价值的文章主要介绍了Kafka 监听器详解。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Kafka 监听器详解

Kafka Assistant 是一款 Kafka GUI 管理工具——管理Broker,Topic,Group、查看消费详情、监控服务器状态、支持多种消息格式。


你需要将 advertised.listeners(如果你使用Docker镜像,则为 KAFKA_ADVERTISED_LISTENERS)设置为外部地址(host/IP),以便客户端可以正确地连接到它。否则,他们会尝试连接到内部主机地址–如果无法到达,问题就会接踵而来。

换一种说法,由Spencer Ruport提供。

listeners 是Kafka所绑定的接口。advertised.listeners 是客户端的连接方式。

kafkacat,kafka,java,分布式,ui

在这篇文章中,我将谈论为什么这是有必要的配置 listenersadvertised.listeners,然后展示如何基于几个场景–Docker和AWS来做。

是谁在监听

Apache Kafka是一个分布式系统。数据是从一个给定的分区的领导者那里读取和写入的,这个领导者可以是集群中的任何一个Broker。当一个客户端(生产者/消费者)启动时,它将请求关于哪个broker是一个分区的领导者的元数据–它可以从任何broker那里做到这一点。返回的元数据将包括该分区的领导者broker的可用端点,然后客户端将使用这些端点连接到broker,根据需要读/写数据。

正是这些端点给人们带来了麻烦。在单机上,运行裸机(没有虚拟机,没有Docker),可能只需要使用主机名(或只是localhost),这很容易。但是,一旦你进入更复杂的网络设置和多节点,你就必须更加注意。

让我们假设你有一个以上的网络。比如:

  • Docker内部网络()加上主机
  • 云中的Broker(如AWS EC2)和本地的企业内部机器(甚至在另一个云中)。

你需要告诉Kafka Broker如何相互联系,但也要确保外部客户端(生产者/消费者)可以联系到他们需要联系的Broker。

最关键的是,当你运行一个客户端时,你传递给它的Broker只是它要去的地方,并从那里获得集群中Broker的元数据。它为读/写数据而连接的实际主机和IP是基于Broker在初始连接中传递回来的数据–即使它只是一个单一的节点,而且返回的Broker与它所连接的Broker相同。

为了正确配置,你需要了解Kafka Broker可以有多个监听器。一个监听器是一个组合:

  • Host/IP
  • Port
  • Protocol

让我们来看看一些配置。通常情况下,协议也被用于监听器的名称,但在这里,让我们通过使用监听器的抽象名称来使它变得漂亮和清晰。

KAFKA_LISTENERS: LISTENER_BOB://kafka0:29092,LISTENER_FRED://localhost:9092
KAFKA_ADVERTISED_LISTENERS: LISTENER_BOB://kafka0:29092,LISTENER_FRED://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_BOB:PLAINTEXT,LISTENER_FRED:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_BOB

我使用的是Docker配置名称–如果你直接配置 server.properties(例如,在AWS等),其对应名称在以下列表中缩进显示。

  • KAFKA_LISTENERS 是一个以逗号分隔的监听器列表,以及Kafka绑定监听的主机/IP和端口。对于更复杂的网络,这可能是一个与机器上的特定网络接口相关的IP地址。默认是0.0.0.0,这意味着在所有接口上进行监听。
    • listeners
  • KAFKA_ADVERTISED_LISTENERS 是一个以逗号分隔的监听器列表,包括它们的主机/IP和端口。这是传回给客户端的元数据。
    • advertised.listeners
  • KAFKA_LISTENER_SECURITY_PROTOCOL_MAP 定义了每个监听器名称要使用的安全协议的键/值对。
    • listener.security.protocol.map

Kafka Broker之间的通信,通常是在内部网络(如Docker网络,AWS VPC等)。要定义使用哪个监听器,请指定 KAFKA_INTER_BROKER_LISTENER_NAMEinter.broker.listener.name)。使用的主机/IP必须是其他人可以从Broker机器上访问的。

Kafka客户端很可能不在Broker的网络中,这就是额外监听器的作用。

每个监听器在被连接到时,都会报告它可以被接连到的地址。你到达Broker的地址取决于所使用的网络。如果你从内部网络连接到经纪人,它将是一个不同于外部连接的主机/IP。

当连接到Broker时,返回给客户端的监听器将是你连接到的监听器(基于端口)。

kafkacat是一个探索这个问题的有用工具。使用 -L,你可以看到你所连接的监听器的元数据。基于上述相同的监听器配置(LISTENER_BOB/LISTENER_FRED),注意观察返回的结果:

  • 在9092端口(我们映射为 LISTENER_FRED)进行连接,Broker的地址被反馈为 localhost
$ kafkacat -b kafka0:9092 \
       -L
Metadata for all topics (from broker -1: kafka0:9092/bootstrap):
1 brokers:
broker 0 at localhost:9092
  • 在29092端口(我们将其映射为 LISTENER_BOB)进行连接,Broker的地址被反馈为kafka0。
$ kafkacat -b kafka0:29092 \
           -L
Metadata for all topics (from broker 0: kafka0:29092/0):
1 brokers:
  broker 0 at kafka0:29092

你也可以用tcpdump来检查连接到Broker服务器的客户端的流量,并发现从代理服务器返回的主机名。

为什么我可以连接到Broker,但客户端仍然失败?

即使你能与Broker建立初始连接,在元数据中返回的地址仍然可能是一个你的客户端无法访问的主机名。

让我们一步一步来分析。

  1. 我们在AWS上有一个Broker。我们想从我们的笔记本电脑向它发送一个消息。我们知道EC2实例的外部主机名(ec2-54-191-84-122.us-west-2.compute.amazonaws.com)。我们已经在安全组中创建了必要的条目,为我们的入站流量打开Broker的端口。保险起见,还要检查我们的本地机器是否可以连接到AWS实例上的端口。
$ nc -vz ec2-54-191-84-122.us-west-2.compute.amazonaws.com 9092
found 0 associations
found 1 connections:
    1:  flags=82<CONNECTED,PREFERRED>
  outif utun5
  src 172.27.230.23 port 53352
  dst 54.191.84.122 port 9092
  rank info not available
  TCP aux info available

我们连上了9092端口,继续

echo "test"|kafka-console-producer --broker-list ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 --topic test

这一步发生了什么:

  • 我们的笔记本电脑成功地解析了 ec2-54-191-84-122.us-west-2.compute.amazonaws.com(到IP地址 54.191.84.122),并在9092端口连接到AWS的机器。
  • Broker在端口9092上接收入站连接。它把元数据返回给客户端,主机名是ip-172-31-18-160.us-west-2.compute.internal,因为这是Broker的主机名和监听器的默认值。
  • 然后,客户端试图使用它被赋予的元数据向代理发送数据。由于ip-172-31-18-160.us-west-2.compute.internal不能从互联网上解析,所以它失败了。
$ echo "test"|kafka-console-producer --broker-list ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 --topic test
>>[2022-07-30 15:08:41,932] ERROR Error when sending message to topic test with key: null, value: 4 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for test-0: 1547 ms has passed since batch creation plus linger time
  • 但是,如果我们从Broker所在的机器上尝试同样的事情。
$ echo "foo"|kafka-console-producer --broker-list ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 --topic test
>>
$ kafka-console-consumer --bootstrap-server ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 --topic test --from-beginning
foo

它工作得很好! 这是因为我们正在连接到 9092 端口,它被配置为内部监听器,因此报告其主机名为 ip-172-31-18-160.us-west-2.compute.internal,这可以从Broker机器上解析(因为这是它自己的主机名!)。

  • 我们可以通过使用kafkacat -L标志,看到Broker返回的元数据。
$ kafkacat -b ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 -L
Metadata for all topics (from broker -1: ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092/bootstrap):
1 brokers:
  broker 0 at ip-172-31-18-160.us-west-2.compute.internal:9092

很明显,内部主机名被返回。这也使得这个看似混乱的错误变得更有意义–连接到一个主机名,在另一个主机上得到一个查询错误。

$ kafkacat -b ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 -C -t test
% ERROR: Local: Host resolution failure: ip-172-31-18-160.us-west-2.compute.internal:9092/0: Failed to resolve 'ip-172-31-18-160.us-west-2.compute.internal:9092': nodename nor servname provided, or not known

这里,我们在本地机器上使用消费者模式(-C)的kafkacat来尝试从主题中读取。和以前一样,因为我们从元数据中的Broker那里得到了内部监听器的主机名,所以客户端无法解析这个主机名来进行读/写。

修改我的hosts文件

我看到一个Stack Overflow的答案,建议直接更新我的hosts文件…这不是更简单吗?
这可以解决问题,而不是真正修复它。

如果Broker服务器报告了一个客户端无法连接的主机名,那么在本地 /etc/hosts 中硬编码主机名/IP组合可能看起来是一个很好的修复。但这是一个非常脆弱的、手动的解决方案。当IP发生变化时,当你移动主机而忘记带配置hosts时,以及当其他人想做同样的事情时,会发生什么?

了解并实际修复你的网络的 advertised.listeners 设置要好得多。

如何连接在Docker上的Kafka

kafkacat,kafka,java,分布式,ui

为了在Docker中运行,你需要为Kafka配置两个监听器。

  • **Docker网络内的通信。**这可能是Broker之间的通信以及在Docker中运行的其他组件之间的通信,如Kafka Connect或第三方客户端或生产者。对于这些通信,我们需要使用Docker容器的主机名。同一Docker网络上的每个Docker容器将使用Kafka Broker容器的主机名来到达它。

  • **非Docker网络流量。**这可能是在Docker主机上本地运行的客户端,例如。假设他们将在localhost上连接到从Docker容器暴露出来的端口。这是Docker Compose的片段。

    • Docker网络中的客户端使用监听器BOB进行连接,端口为29092,主机名为kafka0。 这样做,他们会得到要连接的主机名kafka0。每个Docker容器将使用Docker的内部网络来解析kafka0,并能够到达Broker。
    • Docker网络外部(但都在同一个Host Machine里)的客户端使用监听器FRED连接,端口为9092,主机名为localhost。9092端口是由Docker容器暴露的,因此可以连接到。当客户端连接时,他们被赋予Broker元数据的主机名localhost,因此在读/写数据时连接到此。
    • 上述配置无法处理Docker外部和主机外部的客户端想要连接的情况。这是因为无论是kafka0(Docker内部的主机名)还是localhost(Docker主机的环回地址)都是无法解析的。

如何连接到AWS/IaaS上的Kafka

我命名AWS是因为它是大多数人使用的,但这适用于任何IaaS/云解决方案。

这里适用的概念与Docker完全相同。主要的区别是,在Docker中,外部连接很可能只是在localhost上(如上),而在云托管的Kafka中(如AWS上),外部连接将来自非本地的机器。

另一个复杂的问题是,虽然Docker网络与主机的网络严重隔离,但在IaaS上,外部主机名往往是可以在内部解析的,这使得你可能真正遇到这些问题时,一发不可收拾。

有两种方法,取决于你要连接到Broker服务器的外部地址是否也可以在网络(如VPC)上的所有Broker服务器上进行本地解析。

选项1:外部地址是可以在本地解析的

kafkacat,kafka,java,分布式,ui

在这里,你可以用一个监听器搞定。现有的监听器叫 PLAINTEXT,只需要设置 advertised.listeners(即传递给入站客户的那个)。

advertised.listeners=PLAINTEXT://ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092

现在内部和外部的连接都将使用 ec2-54-191-84-122.us-west-2.compute.amazonaws.com 进行连接。因为 ec2-54-191-84-122.us-west-2.com compute.amazonaws.com 既可以在本地也可以在外部解决,所以事情进展顺利。

选项2:外部地址在本地无法解析

你将需要为Kafka配置两个监听器。

  • **AWS网络(VPC)内的通信。**这可能是Broker之间的通信和在VPC中运行的其他组件之间的通信,如Kafka Connect或第三方客户端或生产商。对于这些通信,我们需要使用EC2机器的内部IP(或主机名,如果配置了DNS)。
  • **外部AWS流量。**这可能是测试来自笔记本电脑的连接,或者只是来自不在亚马逊托管的机器。在这两种情况下,需要使用实例的外部IP(或主机名,如果配置了DNS)。

kafkacat,kafka,java,分布式,ui

下面是一个例子:

listeners=INTERNAL://0.0.0.0:19092,EXTERNAL://0.0.0.0:9092
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
advertised.listeners=INTERNAL://ip-172-31-18-160.us-west-2.compute.internal:19092,EXTERNAL://ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092
inter.broker.listener.name=INTERNAL

原文地址:文章来源地址https://www.toymoban.com/news/detail-734660.html

  • Kafka Listeners – Explained

到了这里,关于Kafka 监听器详解的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【游戏引擎Easy2D】基于基础类型展开的监听器学习详解

       🧛‍♂️ iecne个人主页: 点赞关注收藏评论支持哦~ 💡每天 关注 iecne的作品,一起进步 👉本文收录 专栏 :【C++游戏引擎】 🐳希望大家多多支持🥰一起进步呀! 哈喽大家好,我是 iecne ,本期为大家带来的是CPP/C++【游戏引擎Easy2D】一篇打通引擎顶级类型,Listener。包

    2024年01月17日
    浏览(88)
  • 微信小程序(四)--- 自定义组件详解(properties,数据监听器,纯数据字段,插槽,父子间通信,behaviors)

    目录 一、创建组件 二、引用组件 1、局部引用 2、全局引用 三、组件和页面的区别 四、组件样式隔离 1、注意点 2、修改组件的样式隔离选项  五、数据、方法、属性 1、data数据 2、methods方法 3、properties属性  4、data和properties的区别  5、使用setData修改properties的值  六、数据

    2024年01月24日
    浏览(52)
  • HttpSessionListener监听器和HttpSessionAttributeListener监听器

    1.作用:监听Session创建或销毁,即生命周期监听 2.相关方法: 3.使用场景: 和前面的ServletContextListener等一样,可以用于监控用户上线和离线 4.代码 HttpSessionListener监听器 实现类 HttpSessionAttributeListener监听器 1.作用:监听Session属性的变化,使用少 2.相关方法: 3.代码 监听器 实

    2024年02月04日
    浏览(51)
  • Listener监听器----HttpServletRequest对象的生命周期监听器

    一、HttpServletRequest对象的生命周期监听器         ServletRequestListener接口定义了ServletRequest(是HttpServletRequest接口的父接口类型)对象生命周期的监听行为。 void requestInitialized(ServletRequestEvent sre)         HttpServletRequest对象创建后会触发该监听器方法,并将已创建HttpServletR

    2024年01月23日
    浏览(61)
  • camunda执行监听器和任务监听器有什么区别

    Camunda的执行监听器和任务监听器是用于添加自定义逻辑的监听器,它们的区别在于作用对象和触发事件的不同。 执行监听器是与BPMN流程中的各种流程元素(例如开始事件、用户任务、服务任务、网关等)相关联的。执行监听器可以在流程元素执行前、执行后或抛出异常时添

    2024年02月04日
    浏览(56)
  • 消息监听器和消息监听容器

    前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站。 消息监听器顾名思义用来接收消息,它是使用消息监听容器的必须条件。目前有8个消息监听器: 使用自动提交或容器管理的提交方法之一,处理从 Kafka 消费者 p

    2024年02月07日
    浏览(53)
  • Android手势监听、触摸监听器、onTouchListener

    一次点击事件(onClickListener)由 一次down事件,多次move事件和一次up事件构成,move事件出现的次数由用户的按压效果决定。down表示手势事件开始,up表示结束,move则代表着过程。此时代码中onTouchLishtener方法中返回的结果为ture,则表示拦截用户的该次行为,由此方法进行处理

    2024年02月04日
    浏览(50)
  • watch监听器三种监听方式

    1、普通监听( 无法监听到第一次绑定的变化 ) 这样使用watch时有一个特点,就是当值第一次绑定的时候,不会执行监听函数,只有值发生改变才会执行。 2、普通监听( 可以监听到第一次绑定的变化) 给 text 绑定了一个handler方法,之前我们写的 watch 方法其实默认写的就是

    2024年02月15日
    浏览(46)
  • 【JavaWeb】9—监听器

    ⭐⭐⭐⭐⭐⭐ Github主页👉https://github.com/A-BigTree 笔记链接👉https://github.com/A-BigTree/Code_Learning ⭐⭐⭐⭐⭐⭐ 如果可以,麻烦各位看官顺手点个star~😊 如果文章对你有所帮助,可以点赞👍收藏⭐支持一下博主~😆 可见Java设计模式中的观察者模式。 观察者:监控『被观察者』

    2023年04月09日
    浏览(48)
  • 计算属性和监听器

    计算属性是Vue中非常常用的一个概念,它能够根据现有的数据直接生成一个新的数据进行展示和操作。在Vue的实例中,可以使用 computed 来定义计算属性。 例如,我们有一个Vue实例 vm ,其中有一个数据 message ,我们可以通过计算属性 reversedMessage 来生成消息的反转字符串: 在

    2024年02月15日
    浏览(41)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包