flink启动报错Failed to construct kafka producer

这篇具有很好参考价值的文章主要介绍了flink启动报错Failed to construct kafka producer。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

flink local模式下启动 sink2kafka报错,具体报错如下

apache.kafka.common.KafkaException: Failed to construct kafka producer
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
    at org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:56)
......................
Caused by: org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:374)
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:392)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:359)

提取报错信息

Failed to construct kafka producer

class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer

代码

flink版本是14.6

kafkaProperties里存的是kafka的信息

flink启动报错Failed to construct kafka producer,flink,大数据

   println(s"========kafka properties========\r\n$kafkaProperties");
    val broker: String = kafkaProperties.getProperty("broker")
    val topic: String = kafkaProperties.getProperty("topic")
    val kafkaSink: KafkaSink[String] = KafkaSink.builder()
      .setBootstrapServers(broker)
      .setRecordSerializer(
        KafkaRecordSerializationSchema.builder()
          .setTopic(topic)
          .setValueSerializationSchema(new SimpleStringSchema())
          .build()
      )
      .setKafkaProducerConfig(kafkaProperties)
      .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
      .build();
    data.map(record=>JacksonManager.mapper.writeValueAsString(record))
      .sinkTo(kafkaSink).name("sink2kafka")

本地起了一个sink2kafka的demo 也没问题,但是在服务器启动的时候就报错了,试了多次无果,开始分析报错原因。

我们要sink2kafka,那么flink肯定根据我们的kafka信息创建一个kafkaProducer

对应的报错,这里是kafkaProducer的构造器init失败了

org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432)

那么为什么init失败了呢?因为这个类ByteArraySerializer 不是Serializer 的实例

class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer

那么这个为什么不是实例呢?我们在idea里看下

package org.apache.kafka.common.serialization;

public class ByteArraySerializer implements Serializer<byte[]> {
    @Override
    public byte[] serialize(String topic, byte[] data) {
        return data;
    }
}

这里明明就是,为啥说不是啊。。。需要思考下。

当时我最开始就考虑是jar包冲突,再看下是否冲突,突然想到一个问题,项目中的有两个人

a喜欢打非依赖的jar的包,也就是flink的jar都不打进去,全放到服务器的flink_home/jar里

b喜欢打全依赖的jar包,也就是所有flink的jar都打进去,然后执行。

目前是b的工程,那么会不会是jar冲突了,是自己工程冲突了 还是打的jar和flink_home/jar里的jar冲突了?

先看工程

flink启动报错Failed to construct kafka producer,flink,大数据flink启动报错Failed to construct kafka producer,flink,大数据

 然后我看了服务器的

flink启动报错Failed to construct kafka producer,flink,大数据

那么原因就出来的,排除多余的jar。就正常启动了 

 文章来源地址https://www.toymoban.com/news/detail-559351.html

 

到了这里,关于flink启动报错Failed to construct kafka producer的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink1.17.1消费kafka3.5中的数据出现问题Failed to get metadata for topics [flink].

    问题呈现 Failed to get metadata for topics [flink]. at org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata(KafkaSubscriberUtils.java:47) at org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getSubscribedTopicPartitions(TopicListSubscriber.java:52) at org.apache.flink.connecto

    2024年02月11日
    浏览(43)
  • docker启动es报错:failed to obtain node locks...

    在学习es时,使用docker启动时发现一直报错: 查找文章: (1)node.lock被其他进程使用了,这也是网上大多数的解释。解决方案呢,首先查看es的进程,然后杀掉。 ps aux | grep elastic kill -9 [pid] 然而我的并没有进程占用 (2)有篇文章写的是目录权限不对 我的docker启动命令是 使

    2024年02月13日
    浏览(40)
  • Kafka报错:Controller 219 epoch 110 failed to change state for partition

    集群里面kafka报错: Controller 219 epoch 110 failed to change state for partition maxwell_atlas-0 from OfflinePartition to OnlinePartition kafka.common.stateChangeFailedException: Failed to elect leader for partition maxwell_atlas-0 under strategy OfflinePartitionLeaderElectionStrategy 错误原因: 新增加的副本的offset比leader的新,所以在

    2024年02月15日
    浏览(38)
  • zookeeper启动报错出现Starting zookeeper ... FAILED TO START详细解决方案

    第1步 来到zkServer.sh 的文件目录下,执行 发现是 此节点上的Zookeeper 所处阶段与 当前ClouderaManager中Zookeeper 的阶段 不匹配 ,导致无法启动此节点上面的 Zookeeper Quorum Server 。   第2步 来到zkData,目录,然后清空version-2文件夹 和删除zookeeper-server.pid文件并同步其他服务器 第3步

    2024年02月02日
    浏览(45)
  • 【解决方法】各类软件启动报错:Failed to create the Java Virtual Machine

    工具:小锐云服 PRO ,Windows 命令处理器,Java 环境 系统版本:Windows 10 描述:不知名原因导致的 Java 虚拟机创建失败,百度良久后通过修改系统环境变量,完成了对问题的处理。 提示:若按照教程还是无法完成操作,可以进入右侧的企鹅,找我看看。 视频教程: 文字教程:

    2024年02月12日
    浏览(41)
  • 启动jenkins报错 Failed to start Jetty或Failed to bind to 0.0.0.0/0.0.0.0:8080或Address already in use

    安装jenkins就不说了,能走到这一步的都知道。因我安装的是比较新的jenkins版本 这些问题只在新版本的jenkins安装出现,旧版本的倒是没有遇见过 使用systemctl start jenkins 启动jenkins之后会提示如下信息 Job for jenkins.service failed because the control process exited with error code. See “systemct

    2024年02月01日
    浏览(40)
  • 银河麒麟V10桌面版Docker启动报错:failed to create NAT chain DOCKER: iptables failed

    module=libcontainerd namespace=plugins.moby failed to start daemon: Error initializing network controller: error obtaining controller instance: failed to create NAT chain DOCKER: iptables failed: iptables --wait -t nat -N DOCKER: iptables: Invalid argument. Run `dmesg\\\' for more information. 这个错误通常与 Docker 无法创建必要的 iptables 链有关。

    2024年01月17日
    浏览(41)
  • Reason: Failed to determine a suitable driver class 项目启动报错解决

    今天遇到了这个问题,因为在网关服务的pom.xml文件中引用了其他模块,而其他模块有DataSource相关的依赖,我的配置文件中没有对应的配置,所以报错了。顺便总结一下吧 报错信息如下: 2021-04-01 10:47:19.255 ERROR 3249 --- [  restartedMain] o.s.b.d.LoggingFailureAnalysisReporter   :  *********

    2024年02月05日
    浏览(72)
  • 解决Jenkins-2.396启动报错:Failed to start Jenkins Continuous Integration Server.

    场景:现有环境已经使用 Java 8 在运行业务,安装 Jenkins 后启动报错。 原因:因为 Jenkins-2.396 依赖于 Java 11 版本才能启动。 解决方法: yum 安装Java11 修改Jenkins 启动文件 重启Jenkins

    2024年02月14日
    浏览(47)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包