flink开发常见问题 —— flink-kafka 依赖版本冲突问题

这篇具有很好参考价值的文章主要介绍了flink开发常见问题 —— flink-kafka 依赖版本冲突问题。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

问题描述

由于 flink / kafka 的版本不断更新,创建项目的时候就应当考虑清楚这几个依赖库的版本问题,尽可能地与实际场景保持一致,比如服务器上部署的 kafka 是哪个版本,flink 是哪个版本,从而确定我们需要开发的是哪个版本,并且在真正的开发工作开始之前,应当先测试一下保证 kafka 的版本 、 flink 的版本一致,至少大版本一致,不存在冲突问题,不要为以后的部署埋坑。

解决方案

步骤 1 确定 flink / scala / flink-connect-kafka 的版本

比如 flink 选择的是 1.12.7 这个版本,我们前去 maven 仓库查看 flink-connect-kafka 的版本。首先访问 链接1 https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka 找到对应的 1.12.7 这个版本,也就是根据 flink 的版本去寻找 flink-connect-kafka 的版本,记作 链接2 即 https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka_2.12/1.12.7。

进入 链接2 对应的地址后,可以发现提供的 maven 地址如下:

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.12</artifactId>
    <version>1.12.7</version>
</dependency>

这个地方已经明确地指出, kafka_2.12 指的是对应的是使用 scala 的版本是 2.12 编写的 kakfa ,也就是对应的是 scala 2.12 的版本。为了确保无误,请确保安装的 kafka 也是这个版本。

类似地,如果是其他版本的 flink 也要找到对应的flink-connector-kafka 版本,确保 kafka 的版本的 scala 是一致的。

pom.xml 案例

为了规范,我们把版本号写在前面,然后再引用这些依赖。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>cn.smileyan.demo</groupId>
    <artifactId>flink-kafka</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>pom</packaging>

    <properties>
        <java.version>8</java.version>
        <!-- flink 的版本 -->
        <flink.version>1.12.7</flink.version>
        <!-- scala 的版本(也就是 kafka 的源码的版本)-->
        <scala.binary.version>2.12</scala.binary.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven-shade-plugin.version>3.2.4</maven-shade-plugin.version>
        <slf4j.version>2.0.7</slf4j.version>
    </properties>
    <profiles>
        <profile>
            <id>local</id>
            <activation>
                <activeByDefault>true</activeByDefault>
            </activation>
            <properties>
                <flink.scope>compile</flink.scope>
            </properties>
        </profile>
        <profile>
            <id>prod</id>
            <activation>
                <activeByDefault>false</activeByDefault>
            </activation>
            <properties>
                <flink.scope>provided</flink.scope>
            </properties>
        </profile>
    </profiles>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
            <exclusions>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>*</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-simple</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
            <exclusions>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>*</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
            <exclusions>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>*</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
            <exclusions>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>*</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>${slf4j.version}</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>${maven-shade-plugin.version}</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <createDependencyReducedPom>false</createDependencyReducedPom>
                            <artifactSet>
                                <excludes>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

扩展

根据实际需要,调整 flink 的版本以及 scala 的版本。一定要确保最终我们在 maven 仓库中能找到对应的版本。

JAVA 代码示例

再次强调:一定要找到对应的版本的示例。新版本的 flink 不再支持 new FlinkKafkaProducer 以及 new FlinkKafkaConsumer 这类操作,所以一定要结合实际情况进行调整。

flink version <= 1.13

消费者

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");

FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(
    java.util.regex.Pattern.compile("test-topic-[0-9]"),
    new SimpleStringSchema(),
    properties);

DataStream<String> stream = env.addSource(myConsumer);

生产者

DataStream<String> stream = ...

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");

KafkaSerializationSchema<String> serializationSchema = new KafkaSerializationSchema<String>() {
        @Override
        public ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) {
            return new ProducerRecord<>(
                    "my-topic", // target topic
                    element.getBytes(StandardCharsets.UTF_8)); // record contents
            }
        };

FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(
        "my-topic",             // target topic
        serializationSchema,    // serialization schema
        properties,             // producer config
        FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance

stream.addSink(myProducer);

flink version > 1.13

消费者

KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers(brokers)
    .setTopics("input-topic")
    .setGroupId("my-group")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

生产者

DataStream<String> stream = ...
        
KafkaSink<String> sink = KafkaSink.<String>builder()
        .setBootstrapServers(brokers)
        .setRecordSerializer(KafkaRecordSerializationSchema.builder()
            .setTopic("topic-name")
            .setValueSerializationSchema(new SimpleStringSchema())
            .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
            .build()
        )
        .build();
        
stream.sinkTo(sink);

依赖<scope> 设置为 provided

flink 的 job 开发完成以后,需要打包上传到服务器端运行,实际上 flink-java flink-stream 等等依赖包在服务器端的 flink 都提供了这些依赖,所以打包的时候可以去除这些依赖,以减小打包后的 jar 文件的大小。

所以如上面的 pom.xml 文件所示,我们把很多依赖的 scope 设置为 provided,但是带来的新问题就是运行 flink job 的main方法时会出现报错提示这些依赖找不到:

flink开发常见问题 —— flink-kafka 依赖版本冲突问题
这时我们需要进行配置,步骤如下:

flink开发常见问题 —— flink-kafka 依赖版本冲突问题
flink开发常见问题 —— flink-kafka 依赖版本冲突问题
flink开发常见问题 —— flink-kafka 依赖版本冲突问题
再次运行就不会报错了。

参考链接

  • https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/
  • https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/

总结

flink-kafka 的项目创建本身应当是一件很容易的事情,但是为了避免为以后的开发埋雷,一定要规范地编写依赖,并结合实际情况对版本进行调整,并非一切都应当用最新版本的。

Smileyan
2023-03-25 00:29文章来源地址https://www.toymoban.com/news/detail-465339.html

到了这里,关于flink开发常见问题 —— flink-kafka 依赖版本冲突问题的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 关于Qt程序打包后运行库依赖的常见问题分析及解决方法

    目录 一. 大致如下常见问题: (1)找不到程序所依赖的Qt库 version `Qt_5\\\' not found (required by (2)Could not Load the Qt platform plugin \\\"xcb\\\" in \\\"\\\" even though it was found (3)打包到在不同的linux系统下,或者打包到高版本的相同系统下,运行程序时,直接提示段错误即segmentation fault,或者I

    2023年04月17日
    浏览(58)
  • Flink|《Flink 官方文档 - 部署 - 内存配置 - 调优指南 & 常见问题》学习笔记

    学习文档: 《Flink 官方文档 - 部署 - 内存配置 - 调优指南》 《Flink 官方文档 - 部署 - 内存配置 - 常见问题》 学习笔记如下: 独立部署模式(Standalone Deployment)下的内存配置 通常无需配置进程总内存,因为不管是 Flink 还是部署环境都不会对 JVM 开销进行限制,它只与机器的

    2024年02月19日
    浏览(45)
  • Flink本地集群部署启动&常见问题的解决方法

    [zhangflink@9wmwtivvjuibcd2e software]$ vim flink/conf/flink-conf.yaml [zhangflink@9wmwtivvjuibcd2e software]$ vim flink/conf/workers [zhangflink@9wmwtivvjuibcd2e software]$ xsync flink/conf/ 启动集群在jobmanager那台机器启动 [zhangflink@9wmwtivvjuibcd2e-0001 flink]$ bin/start-cluster.sh 启动成功jobmanager会出现如下进程 启动成功taskm

    2024年02月02日
    浏览(53)
  • Linux部署Kafka及常见问题记录

    监控 Metrics 网站活动追踪 Website Activity Tracking 日志收集 Log Aggregation 流处理 Stream Processing 事件溯源 Event Sourcing 提交日志 Commit Log Broker 和AMQP里协议的概念一样, 就是消息中间件所在的服务器 Topic(主题) 每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上

    2024年02月02日
    浏览(51)
  • 八、Kafka时间轮与常见问题

    Kafka中存在大量的延时操作。 1、发送消息-超时+重试机制 2、ACKS 用于指定分区中必须要有多少副本收到这条消息,生产者才认为写入成功(延时 等) Kafka并没有使用JDK自带的Timer或者DelayQueue来实现延迟的功能,而是基于时间轮自定义了一个用于实现延迟功能的定时器(Syst

    2024年02月15日
    浏览(31)
  • Streampark集成Cloudera Flink、ldap、告警,以及部署常见问题

    集成背景 我们当前集群使用的是Cloudera CDP,Flink版本为Cloudera Version 1.14,整体Flink安装目录以及配置文件结构与社区版本有较大出入。直接根据Streampark官方文档进行部署,将无法配置Flink Home,以及后续整体Flink任务提交到集群中,因此需要进行针对化适配集成,在满足使用需

    2023年04月09日
    浏览(44)
  • 大数据_面试_ETL组件常见问题_spark&flink

    问题列表 回答 spark与flink的主要区别 flink cdc如何确保幂等与一致性 Flink SQL CDC 实践以及一致性分析-阿里云开发者社区 spark 3.0 AQE动态优化 hbase memorystore blockcache sparksql如何调优 通过webui定位那个表以及jobid,jobid找对应的执行计划 hdfs的常见的压缩算法 hbase的数据倾斜 spark数据处

    2024年02月16日
    浏览(44)
  • Android开发常见问题

    看下当前工程目录中是否存在gradle目录,如果不存在,创建一个新的工程,拷贝新工程的gradle文件夹到当前工程。gradle中有两个文件。 解决方法: 1.找到c盘下的gradle.properties文件 2.将代理注释 3.在gradle中设置不使用代理,重新加载,问题解决 gradle的版本太老了。将gradle升级

    2024年02月13日
    浏览(50)
  • JAVA开发中常见问题

    目录 1.深浅克隆问题 2.Mysql中可以代替左模糊或全查询的函数方法 3.开发时需注意,使用String类的equals()方法时,原则上需要左边的变量不能为null值,避免程序执行时出现空指针报错 4.Mysql Update的高效应用 5.Mysql Insert 的高效应用 6.在try-catch-finally代码块中return或者throw Exception时需

    2024年02月05日
    浏览(47)
  • Kafka如何保证消息的消费顺序【全局有序、局部有序】、Kafka如何保证消息不被重复消费、Kafka为什么这么快?【重点】、Kafka常见问题汇总【史上最全】

    目录 Kafka消息生产 一个Topic对应一个Partition 一个Topic对应多个Partition Kafka消息的顺序性保证(Producer、Consumer) 全局有序 局部有序  max.in.flight.requests.per.connection参数详解 Kafka的多副本机制 Kafka的follower从leader同步数据的流程 Kafka的follower为什么不能用于消息消费 Kafka的多分区

    2024年04月11日
    浏览(54)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包