kafka忽略集群Node信息,直接向`bootstrap.servers`地址发送消息

这篇具有很好参考价值的文章主要介绍了kafka忽略集群Node信息,直接向`bootstrap.servers`地址发送消息。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

背景

当kafka单机安装的时候或者集群安装的时候,kafka会先通过bootstrap.servers获取集群节点。
有时候网络复杂的时候

  • 如内网外部署
  • 地址映射
  • 代理转发等

bootstrap.servers配置地址可能为一个公网地址181.39.77.53:9092,然而返回的节点为内网地址172.16.31.33:9092,此时由于未开通172.16.31.33:9092网络,导致访问失败。

此时通常有两种解决方案文章来源地址https://www.toymoban.com/news/detail-490989.html

  • 开通kafka返回节点的网络
  • 修改kafka连接方式,忽略返回的node节点信息,直接访问bootstrap.servers配置的地址
    这里主要介绍下方案二,直接访问bootstrap.servers配置的地址

实现原理

  • 利用类加载机制,重复的类只会加载一次。
  • 同名类先加载的类先生效,后加载的类被忽略
  • 优先加载运行jar包内类。
    可以通过在项目内新建同名类(包名也要相同),修改源码覆盖的方式来实现。

实现代码

  • 在项目内新建org.apache.kafka.clients.NetworkClient
  • 通过修改initiateConnect(Node node, long now)实现
  /**
     * Initiate a connection to the given node
     * @param node the node to connect to
     * @param now current time in epoch milliseconds
     */
    private void initiateConnect(Node node, long now) {

        String nodeConnectionId = node.idString();

        try {
            connectionStates.connecting(nodeConnectionId, now, node.host());
            InetAddress address = connectionStates.currentAddress(nodeConnectionId);

            // 开启单机版kafka连接
            // 如果单机kafka 直接连接`bootstrap.servers`中配置的节点
            if (HookConnectProperties.hooked) {
                KafkaChannel channel = ((Selector) selector).channel("-1");

                // 如果channel为空说明访问的`bootstrap.servers`
                // 不为空则开始访问kafka返回的集群节点
                if(channel != null) {
                    InetSocketAddress remoteAddress = null;
                    try {
                        // 尝试通过反射方式获取`remoteAddress`
                        Field field = KafkaChannel.class.getDeclaredField("remoteAddress");
                        field.setAccessible(true);
                        remoteAddress = (InetSocketAddress)field.get(channel);
                        log.debug("Initiating connection to node {} using address {}", node, address);
                        selector.connect(nodeConnectionId,
                                remoteAddress,
                                this.socketSendBuffer,
                                this.socketReceiveBuffer);

                    } catch (NoSuchFieldException | IllegalAccessException e) {
                        // 获取不到则获取远程地址
                        address = channel.socketAddress();
                        log.debug("Initiating connection to node {} using address {}", node, address);
                        selector.connect(nodeConnectionId,
                                new InetSocketAddress(address, node.port()),
                                this.socketSendBuffer,
                                this.socketReceiveBuffer);
                    }
                    return;
                }
            }
            log.debug("Initiating connection to node {} using address {}", node, address);
            selector.connect(nodeConnectionId,
                    new InetSocketAddress(address, node.port()),
                    this.socketSendBuffer,
                    this.socketReceiveBuffer);
        } catch (IOException e) {
            log.warn("Error connecting to node {}", node, e);
            // Attempt failed, we'll try again after the backoff
            connectionStates.disconnected(nodeConnectionId, now);
            // Notify metadata updater of the connection failure
            metadataUpdater.handleServerDisconnect(now, nodeConnectionId, Optional.empty());
        }
    }

到了这里,关于kafka忽略集群Node信息,直接向`bootstrap.servers`地址发送消息的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【】Ubuntu Server 22.04 LTS 设置主机信息以及IP地址

    得知网卡为 ens160 编辑配置文件 设置 managed=true network之下全部删除,添加 renderer: NetworkManager 此时会断网一下,如果使用DHCP的话,可能会获得新IP地址。 查看网卡信息 得知:ens160 连接到 Wired connection 1 配置主机名,设置IP地址 此时会断网。然后使用新IP地址连接。 添加位置,

    2024年02月16日
    浏览(58)
  • k8s集群Node节点管理:节点信息查看及节点label标签管理

    如果是kubeasz安装,所有节点(包括master与node)都已经可以对集群进行管理 如果是kubeadm安装,在node节点上管理时会报如下错误 只要把master上的管理文件 /etc/kubernetes/admin.conf 拷贝到node节点的 $HOME/.kube/config 就可以让node节点也可以实现kubectl命令管理 1, 在node节点的用户家目录创建

    2024年02月03日
    浏览(50)
  • 使用javaApi监控 kafka 集群的环境下消费组的积压信息

    需求:提供一个能够监控 kafka 集群的环境下消费组的积压信息。当某个消费组积压的信息超过设定的阈值的时候,程序主动告警提醒。 难点: 集群环境,有多个机器。 每个机器上存在多个主题,多个消费组。 使用javaapi查询 思路: 1。先获取集群环境下某台机子下的所有主

    2024年02月12日
    浏览(28)
  • 部署kafka集群后创建主题超时Timed out waiting for a node assignment. Call:createTopics (kafka.admin.TopicCommand

    一、起初无非就是更改server.properties中的配置,以下三项 1.14.247.152的server.properties 159.75.241.252的server.properties 112.74.188.40的server.properties 二、分别在三台服务器的bin目录下启动kafka 三、jps命令查看状态启动成功后,连接kafka并创建topics 报错: 大概意思是等待分配结点超时。 查

    2023年04月19日
    浏览(42)
  • 查看Kafka集群下所有的topic报错“Timed out waiting for a node assignment. Call: listTopics“

    没有配置主机IP地址。 取消掉 listeners 的注释,然后修改值为 listeners=PLAINTEXT://192.168.88.142:9092 ,其中 192.168.88.142 是当前服务器的IP地址。 注意:Kafka集群中每台服务器上的 server.properties 配置文件都需要修改 listeners 配置项,都修改为自己对应服务器的IP地址。 修改配置后,关

    2024年02月16日
    浏览(47)
  • 【node-1】node validation exception. bootstrap checks failed

    从报错信息中看到,文件,虚拟内存的最大值太低,我们需要调整设置虚拟内存大小,以满足ElasticSearch 运行需求。 按照图中的要求,分别编辑 /etc/security/limits.conf 文件和 /etc/sysctl.conf 文件,添加内容。 在第一个文件limits.conf 中添加时,把提示内容的 * 星号也带上。 第二个

    2024年02月15日
    浏览(45)
  • git忽略node_modules文件

    忽略所有node_modules文件夹 些文件无需纳入 Git 的管理,也不希望它们总出现在未跟踪文件列表。 在这种情况下,我们可 以创建一个名为 .gitignore 的配置文件,列出要忽略的文件的匹配模式。 文件 .gitignore 的格式规范如下: ① 以 # 开头的是注释 ② 以 / 结尾的是目录 ③ 以

    2024年02月07日
    浏览(56)
  • IDEA设置忽略node_modules

    项目中有node_modules,在idea打开时,idea也会扫描该目录并索引,有时会导致卡死。 可以在 文件 ---- 设置 ---- 编辑器 ---- 文件类型,将node_modules设置为忽略文件。 修改 项目.iml 文件,在content节点增加 excludeFolder url=\\\"file://$MODULE_DIR$/node_modules\\\" /

    2024年02月08日
    浏览(49)
  • git 忽略 node_modules 文件夹

    git 忽略 node_modules 文件夹 在开发过程中,如果不想跟踪 node_modules 文件中的许多更改,可以在项目中创建一个 .gitignore 文件,用来设置 git 忽略该文件夹。 忽略根文件夹中的 node_modules 文件夹 文件结构: .gitignore 中提到的文件/文件夹不会被 git 跟踪 。 所以要忽略 node_modules,

    2024年02月12日
    浏览(69)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包