[Flink04] Flink部署实践

这篇具有很好参考价值的文章主要介绍了[Flink04] Flink部署实践。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

    Flink部署支持三种模式:本地部署、Standalone部署、Flink on Yarn部署。

    独立(Standalone)模式由Flink自身提供资源,无需其他框架,这种方式降低了和其他第三方资源框架的耦合性,独立性非常强。但Flink 是大数据计算框架,不是资源调度框架,这并不是它的强项;所以还是应该让专业的框架做专业的事,和其他资源调度框架集成更靠谱。

    Flink on Yarn模式,把资源管理交给Yarn实现,计算机资源统一由Haoop Yarn管理,生产环境测试。

     Yarn(yet another resource negotiator)是一个通用分布式资源管理系统和调度平台,为上层应用提供统一的资源管理和调度。在集群利用率、资源统一管理和数据共享等方面带来巨大好处.

1 基础环境

1.1 服务器环境

操作系统环境为CentOS 8。

1)配置规划集群节点间免密访问

参考相关章节或附录的指南配置,可以有效提供部署和管理效率。

2)配置JAVA环境

参考相关章节或附录的指南配置。

3)配置HDFS存储集群

如果需要与HDFS存储集群集成,则需要提前完成配置。

参考相关章节或附录的指南配置,并且Flink规划集群或设备可网络访问。

4)配置zookeeper集群

如果需要部署Standalone模式,则需要提前完成配置。参考相关章节或附录的指南配置。

5)配置 Yarn集群

如果需要部署Flink on Yarn模式,则需要提前完成配置。参考相关章节或附录的指南配置。

1.2 Flink软件基础配置

在本实践案例中,采用的Flink软件包版本 1.14.5,Hadoop的版本为3.2,Spark软件的根目录(SPARK_HOME)为/opt/flink/flink。

源码下载可以通过官方源和国内源两种方式下载,官方源再国外,下载速度慢,国内源采用清华大学的源,速度相对较快,但只保留最新版本。

Apache官方:https://archive.apache.org/dist/flink/

清华大学:https://mirrors.tuna.tsinghua.edu.cn/apache/flink/

下载软件包:

# wget https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.14.5/flink-1.14.5-bin-scala_2.12.tgz
# tar -zxvf flink-1.14.5-bin-scala_2.12.tgz

解压软件包:

# tar -zxvf flink-1.14.5-bin-scala_2.12.tgz
# ln -s flink-1.14.5 flink
# ls flink/
bin  conf  examples  lib  LICENSE  licenses  log  NOTICE  opt  plugins  README.txt

2 本地模式

最简单的启动方式,其实是不搭建集群,直接本地模式启动。

2.1 配置部署

在本地模式下,不需要启动任何的进程,仅仅是使用本地线程来模拟 Flink 的进程,适用于测试开发调试等,不用更改任何配置信息,只需要保证 JDK8 安装正常即可。

1)启动命令

# /usr/local/flink/bin/start-cluster.sh

2)关闭命令

# /usr/local/flink/bin/stop-cluster.sh

2.2 测试验证

1)Flink启动

# /usr/local/flink/bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host lake01.
Starting taskexecutor daemon on host lake01

2)访问验证

通过<local主机地址>:8081打开

[Flink04] Flink部署实践,数据湖,flink,大数据

3)jps查看

# jps
3968 Jps
1941 NameNode
3685 TaskManagerRunner
2790 NodeManager
3418 StandaloneSessionClusterEntrypoint
2159 DataNode

4)执行官方用例WordCount

通过执行官方示例,可以看到flink任务运行成功

# /opt/flink/flink/bin/flink run /opt/flink/flink/examples/batch/WordCount.jar
Executing WordCount example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID d6987ed5b263fc5e297d5be1e28b465a
Program execution finished
Job with JobID d6987ed5b263fc5e297d5be1e28b465a has finished.
Job Runtime: 373 ms
Accumulator Results:
- 843a1470cb2c3e3169dfb25bcda7369d (java.util.ArrayList) [170 elements]
(a,5)
(action,1)
(after,1)
(against,1)
……

观察Flink WebUI,如下图

[Flink04] Flink部署实践,数据湖,flink,大数据

2.3 问题-提示无法连接Yarn服务

一、问题描述

从flink on yarn模式切换为本地模式,执行start-cluster.sh提示如下错误:

2022-10-13 20:38:05,757 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/opt/flink/flink-1.14.5/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2022-10-13 20:38:05,946 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at **lake02/******:8032
2022-10-13 20:38:06,017 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2022-10-13 20:38:07,043 INFO  org.apache.hadoop.ipc.Client                                 [] - Retrying connect to server: **lake02/******:8032. Already tried 0 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
……
2022-10-13 20:38:16,052 WARN  org.apache.hadoop.ipc.Client                                 [] - Failed to connect to server: **lake02/******:8032: retries get failed due to exceeded maximum allowed retries number: 10
java.net.ConnectException: Connection refused

二、问题分析

通过jps发现缺少TaskManagerRunner。

# jps
1902910 StandaloneSessionClusterEntrypoint
1861160 RunJar
1903349 SqlClient
1861350 RunJar
128746 NameNode
200744 QuorumPeerMain
1865793 Kafka
1911504 Jps

发现workers和masters文件均为空

三、解决方案

恢复masters和workers的内容

# cat masters
localhost:8081

# cat workers
localhost

3 Standalone模式

3.1 概述

     Standalone模式是最简单的一种集群模式,不需要Yarn、mesos等资源调度平台,自带集群,资源管理由flink集群管理,开发环境测试使用。

    Standalone模式是一种主从模式,主要有两个组件构成分别是JobManager(Master)和TaskManager(Slave)。

当一个应用提交执行时,Flink的各个组件是如何交互协作的:

[Flink04] Flink部署实践,数据湖,flink,大数据

1)App程序通过rest接口提交给Dispatcher(rest接口是跨平台,并且可以直接穿过防火墙,不需考虑拦截)。

2)Dispatcher把JobManager进程启动,把应用交给JobManager。

3)JobManager拿到应用后,向ResourceManager申请资源(slots),ResouceManager会启动对应的TaskManager进程,TaskManager空闲的slots会向ResourceManager注册。

4)ResourceManager会根据JobManager申请的资源数量,向TaskManager发出指令(这些slots由你提供给JobManager)。

5)接着,TaskManager可以直接和JobManager通信了(它们之间会有心跳包的连接),TaskManager向JobManager提供slots,JobManager向TaskManager分配在slots中执行的任务。

6)最后,在执行任务过程中,不同的TaskManager会有数据之间的交换。

3.2 配置部署

一、节点规划

本次通过虚拟机部署,采用5个节点 ,每一个节点提供一块500GB的硬盘。

Hostname

IP

用途

说明

labnode01

192.168.80.131

master, jobmanager

OScentos8.0

labnode02

192.168.80.132

slavetaskmanager

OScentos8.0

labnode03

192.168.80.133

slavetaskmanager

OScentos8.0

二、修改配置文件

1)修改flink-conf.yaml配置文件:

##配置master节点ip
jobmanager.rpc.address: 192.168.1.100

##配置每个节点的可用slot,1 核CPU对应 1 slot
##the number of available CPUs per machine
taskmanager.numberOfTaskSlots: 30

##默认并行度 1 slot资源
parallelism.default: 1

2)修改master和work配置文件

Master文件

# cat masters
labnode01:8081

workers文件

labnode02
labnode03

将以上文件分发各节点对应文件夹。

三、集群启动和关闭

在master节点上执行此脚本,就可以启动集群,前提要保证master节点到slaver节点可以免密登录。

因为它的启动过程是:先在master节点启动jobmanager进程,然后ssh到各slaver节点启动taskmanager进程。

启动集群

# /usr/local/flink/bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host lake02.
Starting taskexecutor daemon on host lake03.
Starting taskexecutor daemon on host lake04.
Starting taskexecutor daemon on host slake05.

停止集群:

# /usr/local/flink/bin/stop-cluster.sh

3.3 运行验证

1)启动Flink

# /usr/local/flink/bin/start-cluster.sh

2)访问flink webUI

[Flink04] Flink部署实践,数据湖,flink,大数据

3)执行官方用例WordCount

执行命令:

# /usr/local/flink/bin/flink run /usr/local/flink/examples/batch/WordCount.jar
……
- f27663f6191a378629eea720a988cc53 (java.util.ArrayList) [170 elements]


(a,5)
(action,1)
(after,1)
(against,1)
(all,2)
(and,12)
(arms,1)
……

查看Flink WebUI

[Flink04] Flink部署实践,数据湖,flink,大数据

4 Flink On Yarn模式

4.1 概述

    独立(Standalone)模式由 Flink 自身提供资源,无需其他框架,这种方式降低了和其他第三方资源框架的耦合性,独立性非常强。但Flink 是大数据计算框架,不是资源调度框架,这并不是它的强项;所以还是应该让专业的框架做专业的事,和其他资源调度框架集成更靠谱。

    在目前大数据生态中,国内应用最为广泛的资源管理平台是Yarn。Yarn(yet another resource negotiator)是一个通用分布式资源管理系统和调度平台,为上层应用提供统一的资源管理和调度。在集群利用率、资源统一管理和数据共享等方面带来巨大好处。

    Flink on Yarn 企业生产环境运行Flink任务大多数的选择。

    在强大的Yarn平台上,Flink是如何在Yarn上集成部署的,其过程是:客户端把Flink 应用提交给Yarn的ResourceManager,Yarn的ResourceManager会向Yarn的 NodeManager 申请容器。在这些容器上,Flink会部署JobManager 和 TaskManager 的实例,从而启动集群。Flink会根据运行在JobManger上的作业所需要的 Slot 数量动态分配TaskManager资源。

[Flink04] Flink部署实践,数据湖,flink,大数据

1)提交App之前,先上传Flink的Jar包和配置到HDFS,以便JobManager和TaskManager共享HDFS的数据。

2)客户端向ResourceManager提交Job,ResouceManager接到请求后,先分配container资源,然后通知NodeManager启动ApplicationMaster。

3)ApplicationMaster会加载HDFS的配置,启动对应的JobManager,然后JobManager会分析当前的作业图,将它转化成执行图(包含了所有可以并发执行的任务),从而知道当前需要的具体资源。

4)接着,JobManager会向ResourceManager申请资源,ResouceManager接到请求后,继续分配container资源,然后通知ApplictaionMaster启动更多的TaskManager(先分配好container资源,再启动TaskManager)。container在启动TaskManager时也会从HDFS加载数据。

5)最后,TaskManager启动后,会向JobManager发送心跳包。JobManager向TaskManager分配任务。

Flink提供了yarn上运行的3模式,分别为Session-Cluster,Application Mode和Per-Job-Cluster模式。

4.2 配置部署

一、节点规划

本次通过虚拟机部署,采用5个节点 ,每一个节点提供一块500GB的硬盘。

Hostname

IP

用途

说明

labnode01

192.168.80.131

master, jobmanager

labnode02

192.168.80.132

slavetaskmanager

labnode03

192.168.80.133

slavetaskmanager

二、Yarn环境配置

在Yarn-site.xml中配置关闭内存校验。

Yarn-site.xml是hadoop中/etc/hadoop下的配置文件,否则flink任务可能会因为内存超标而被Yarn集群主动杀死。

<!-- Mem Check Start -->
<!-- 设置不检查虚拟内存的值,不然内存不够会报错 -->
    <property>
        <name>yarn.nodemanager.pmem-check-enabled</name>
        <value>false</value>
    </property>
    <property>
        <name>yarn.nodemanager.vmem-check-enabled</name>
        <value>false</value>
    </property>
    <!-- Mem Check end -->

将修改后的配置文件分发到各节点,然后重启Yarn集群。

三、将Flink软件和配置文件分发到Flink集群规划节点

将Flink的配置文件conf/flink-conf.yaml恢复为初始状态。

4.3 Session-Cluster模式(yarn-session)

4.3.1 概述

[Flink04] Flink部署实践,数据湖,flink,大数据

    Session-Cluster模式需要先启动Flink集群,向Yarn申请资源。以后提交任务都向这里提交。这个Flink集群会常驻在yarn集群中,除非手工停止。

    在向Flink集群提交Job的时候, 如果资源被用完了,则新的Job不能正常提交.

    缺点: 如果提交的作业中有长时间执行的大作业, 占用了该Flink集群的所有资源, 则后续无法提交新的job.

    所以, Session-Cluster适合那些需要频繁提交的多个小Job, 并且执行时间都不长的Job。

会话模式有两种操作模式:

  1. 附加模式(默认):yarn-session.sh客户端将 Flink 集群提交给 YARN,但客户端一直在运行,跟踪集群的状态。如果集群失败,客户端将显示错误。如果客户端被终止,它也会发出集群关闭的信号。
  2. 分离模式(-d或--detached):yarn-session.sh客户端将 Flink 集群提交给 YARN,然后客户端返回。需要再次调用客户端或 YARN 工具来停止 Flink 集群。

4.3.2 常用命令

1)yarn-session.sh参数说明

使用bin/yarn-session.sh --help 查看可用参数:

Usage:
   Optional
     -at,--applicationType <arg>     Set a custom application type for the application on Yarn
     -D <property=value>             use value for given property
     -d,--detached                   If present, runs the job in detached mode
     -h,--help                       Help for the Yarn session CLI.
     -id,--applicationId <arg>       Attach to running Yarn session
     -j,--jar <arg>                  Path to Flink jar file
     -jm,--jobManagerMemory <arg>    Memory for JobManager Container with optional unit (default: MB)
     -m,--jobmanager <arg>           Set to Yarn-cluster to use Yarn execution mode.
     -nl,--nodeLabel <arg>           Specify Yarn node label for the Yarn application
     -nm,--name <arg>                Set a custom name for the application on Yarn
     -q,--query                      Display available Yarn resources (memory, cores)
     -qu,--queue <arg>               Specify Yarn queue.
     -s,--slots <arg>                Number of slots per TaskManager
     -t,--ship <arg>                 Ship files in the specified directory (t for transfer)
     -tm,--taskManagerMemory <arg>   Memory per TaskManager Container with optional unit (default: MB)
     -yd,--Yarndetached              If present, runs the job in detached mode (deprecated; use non-Yarn specific option instead)
     -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths for high availability mode

2)启动命令

使用Yarn-session.sh命令申请资源初始化一个Flink集群,命令格式如下:

bin/yarn-session.sh <参数>

如示例:

# /opt/flink/flink/bin/yarn-session.sh -d

3)关闭Flink

停止 flink on Yarn 会话模式中的flink集群

yarn application -kill <appid>

echo "stop" | ./bin/flink -id <appid>

如示例:

# echo "stop" | /opt/flink/flink/bin/flink -id application_1661480406159_0025

4.3.3 运行验证

1)启动Flink

# /opt/flink/flink/bin/yarn-session.sh -d

执行结果:

[Flink04] Flink部署实践,数据湖,flink,大数据

访问Yarn WebUI:

[Flink04] Flink部署实践,数据湖,flink,大数据

访问Flink WebUI,http://lake04:38347

[Flink04] Flink部署实践,数据湖,flink,大数据

2)运行官方用例WordCount

# /opt/flink/flink/bin/flink run /opt/flink/flink/examples/batch/WordCount.jar

命令行执行结果:

[Flink04] Flink部署实践,数据湖,flink,大数据

Flink WebUI的首页:

[Flink04] Flink部署实践,数据湖,flink,大数据

Flink WebUI中的结果:

[Flink04] Flink部署实践,数据湖,flink,大数据

3)关闭

执行命令关闭Flink

# echo "stop" | /opt/flink/flink/bin/flink -id application_1661480406159_0025

4.4 Per-Job-Cluster模式(yarn-cluster)

4.4.1 概述

[Flink04] Flink部署实践,数据湖,flink,大数据

一个Job会对应一个Flink集群,每提交一个作业会根据自身的情况,都会单独向yarn申请资源,直到作业执行完成,一个作业的失败与否并不会影响下一个作业的正常提交和运行。独享Dispatcher和ResourceManager,按需接受资源申请;适合规模大长时间运行的作业。

每次提交都会创建一个新的flink集群,任务之间互相独立,互不影响,方便管理。任务执行完成之后创建的集群也会消失。

4.4.2 常用命令

1)参数说明

flink run -m yarn-cluster --help;可用参数:

[Flink04] Flink部署实践,数据湖,flink,大数据

该模式下不需要先启动 yarn-session,确保 Hadoop 集群是健康的情况下直接提交 Job 命令:

bin/flink -m yarn-cluster <参数> <jar file>

如示例:

# /opt/flink/flink/bin/flink run -m yarn-cluster -yjm 1024 -ytm 1024 /opt/flink/flink/examples/batch/WordCount.jar

4.4.3 运行验证

1)启动并执行官方用例WordCount

# /opt/flink/flink/bin/flink run -m yarn-cluster -yjm 1024 -ytm 1024 /opt/flink/flink/examples/batch/WordCount.jar

执行结果:

[Flink04] Flink部署实践,数据湖,flink,大数据

访问Yarn WebUI:

[Flink04] Flink部署实践,数据湖,flink,大数据

4.4.4 优缺点

优点:随到随用,只有任务需要运行时才会开启flink集群;运行完就关闭释放资源,资源利用更合理;

缺点:对于小作业不太友好,

适用场景:适合大作业,长时间运行的大作业。

4.5 Application Mode

4.5.1 概述

    Application Mode会在Yarn上启动集群, 应用jar包的main函数(用户类的main函数)将会在JobManager上执行. 只要应用程序执行结束, Flink集群会马上被关闭. 也可以手动停止集群.

与Per-Job-Cluster的区别: 就是Application Mode下, 用户的main函数式在集群中执行的

官方建议:

出于生产的需求, 我们建议使用Per-job or Application Mode,因为他们给应用提供了更好的隔离!

[Flink04] Flink部署实践,数据湖,flink,大数据

4.5.2 运行验证

启动

# /opt/flink/flink/bin/flink run-application -t yarn-application /opt/flink/flink/examples/batch/WordCount.jar

执行结果:

[Flink04] Flink部署实践,数据湖,flink,大数据

访问Yarn WebUI:

[Flink04] Flink部署实践,数据湖,flink,大数据

4.5.3 常见问题

任务提示 Could not allocate the required slot within slot request tim

一、错误日志

Caused by: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Slot request bulk is not fulfillable! Could not allocate the required slot within slot request timeout
        at org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignResource$8(DefaultScheduler.java:539)
        ... 37 more
Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Slot request bulk is not fulfillable! Could not allocate the required slot within slot request timeout
        at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
        at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
        at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:607)
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
        ... 35 more
Caused by: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Slot request bulk is not fulfillable! Could not allocate the required slot within slot request timeout
        at org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:86)
        ... 28 more
Caused by: java.util.concurrent.TimeoutException: Timeout has occurred: 300000 ms
        ... 29 more

解决方案

将flink的配置文件conf/flink-conf.yaml恢复为初始状态,重新启动flink的Yarn session。

问题FLINK Could not get job jar and dependencies from JAR file: JAR file does not exist:

一、问题描述

使用flink客户端将执行flink提交到Yarn,输入-yjm参数提示错误

# /opt/flink/flink/bin/flink run -m Yarn-cluster -yn 2 -yjm 1024 -ytm 1024 /opt/flink/flink/examples/batch/WordCount.jar
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/flink/flink-1.14.5/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/hadoop/hadoop-3.2.4/share/hadoop/common/lib/slf4j-reload4j-1.7.35.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Could not get job jar and dependencies from JAR file: JAR file does not exist: -yn

二、问题分析

flink1.8版本之后已弃用该参数,ResourceManager将自动启动所需的尽可能多的容器,以满足作业请求的并行性。

三、解决方案

去掉即可

Deployment took more than 60 seconds. Please check if the requested resources are

一、问题描述

日志信息如下:

INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster
INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster

截图如下:

[Flink04] Flink部署实践,数据湖,flink,大数据

二、解决方案

配置yarn-site.xml

  <property>
        <name>yarn.scheduler.minimum-allocation-mb</name>
        <value>1024</value>
    </property>
    <property>
        <name>yarn.scheduler.maximum-allocation-mb</name>
        <value>102400</value>
    </property>
    <property>
        <name>yarn.nodemanager.resource.cpu-vcores</name>
        <value>32</value>
    </property>
   <property>
        <name>yarn.nodemanager.resource.memory-mb</name>
        <value>51200</value>
    </property>
Flink读取Hudi表时报错lassNotFoundException: *mapred.FileInputFormat

一、问题现象:

执行“select * from t1;”报错,报错信息如下:

Flink SQL> select * from t1;
……
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.hadoop.mapred.FileInputFormat

二、原因分析

mapred代表的是hadoop旧API,而mapreduce代表的是hadoop新的API

三、解决办法

解决办法为复制集群的hadoop-mapreduce-client-core.jar到Flink/lib中。

读取数据表失败NoSuchMethodError: *Preconditions.checkArgument

一、问题描述

创建表格式如下

CREATE TABLE t2(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 'hdfs://10.101.12.140:9000/datas/flink-hudi/test0907/t1',
'table.type' = 'MERGE_ON_READ',
'read.tasks' = '1',
'read.streaming.enabled' = 'true',
'read.streaming.start-commit' = '20210316134557',
'read.streaming.check-interval' = '4'
);
INSERT INTO t2 VALUES
  ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
  ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
  ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
  ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
  ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
  ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
  ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
  ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');

执行成功,然后执行表内容查询

select * from t2;

报出如下错误:

Flink SQL> select * from t2;
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputForm
Flink SQL>

二、原因分析

mapred代表的是hadoop旧API,而mapreduce代表的是hadoop新的API

三、解决办法

解决办法为复制集群的hive-exec-3.1.3.jar到各节点的flink/lib中。(注意hive-exec和hadoop版本的匹配)

启动失败NoSuchMethodError: *Preconditions.checkArgument

一、错误描述

通过bin/yarn-session.sh -d启动yarn-session失败,报错信息如下:

The program finished with the following exception:

org.apache.hadoop.yarn.exceptions.YarnRuntimeException: java.lang.reflect.InvocationTargetException
        at org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl.getClient(RpcClientFactoryPBImpl.java:81)
        at org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC.getProxy(HadoopYarnProtoRPC.java:48)
        ……
        ... 21 more
Caused by: java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V
        at org.apache.hadoop.conf.Configuration.set(Configuration.java:1357)
        ……
        at org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl.<init>(ApplicationClientProtocolPBClientImpl.java:209)
        ... 26 more

二、错误原因

Preconditions是guava下的工具类,hudi的源码依赖了不同的项目,这些项目使用了不同的guava版本,所报错误是由于运行时guava版本过旧,没有相应的方法。

三、解决方案

在HADOOP_HOME下查询hadoop使用的guava版本,将其拷贝到FLINK_HOME/lib下:

# find ./ -name guava*
./share/hadoop/common/lib/guava-27.0-jre.jar
./share/hadoop/hdfs/lib/guava-27.0-jre.jar

将文件复制到所有yarn集群的FLINK_HOME/lib下。

重新执行bin/yarn-session.sh -d,成功。

通过yarn启动flink失败-连接yarn失败

通过yarn模式启动flink,报出如下异常,关键信息如下:

# /usr/local/flink/bin/yarn-session.sh -d
….
2022-10-27 11:04:30,332 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at /0.0.0.0:8032
…
2022-10-27 11:04:41,153 WARN  org.apache.hadoop.ipc.Client                                 [] - Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 9 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
2022-10-27 11:04:41,153 WARN  org.apache.hadoop.ipc.Client                                 [] - Failed to connect to server: 0.0.0.0/0.0.0.0:8032: retries get failed due to exceeded maximum allowed retries number: 10
java.net.ConnectException: Connection refused
……

原因分析:

1)检查是否启动hadoop集群, 如果没有启动, 是无法连接到hadoop的yarn。

2)flink运行于yarn上,flink要能找到hadoop配置,因为要连接到yarn的resourcemanager和hdfs。

如果正常启动还无法连接yarn, 可以查看一下hadoop的环境变量是否配置好。

在本实例中,时因为无法获取HADOOP_CONF_DIR的配置信息导致问题发生。

二、解决方案

设置HADOOP_CONF_DIR环境变量,并使之生效。

# cat /etc/profile | grep HADOOP_CONF_DIR
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
# source /etc/profile

然后重新启动flink。

BTW:如果已经设置HADOOP_CONF_DIR环境变量,可能由于某种原因HADOOP_CONF_DIR环境变量没有生效,这个原因有很多。

5 参考资料

[01] https://blog.csdn.net/Vector97/article/details/117398947

[02] https://www.jianshu.com/p/8c9c897ea72a文章来源地址https://www.toymoban.com/news/detail-828151.html

到了这里,关于[Flink04] Flink部署实践的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 使用Flink实现Kafka到MySQL的数据流转换:一个基于Flink的实践指南

    在现代数据处理架构中,Kafka和MySQL是两种非常流行的技术。Kafka作为一个高吞吐量的分布式消息系统,常用于构建实时数据流管道。而MySQL则是广泛使用的关系型数据库,适用于存储和查询数据。在某些场景下,我们需要将Kafka中的数据实时地写入到MySQL数据库中,本文将介绍

    2024年04月15日
    浏览(53)
  • Flink 内容分享(十九):理想汽车基于Flink on K8s的数据集成实践

    目录 数据集成的发展与现状 数据集成的落地实践 1. 数据集成平台架构 2. 设计模型 3. 典型场景 4. 异构数据源 5. SQL 形式的过滤条件 数据集成云原生的落地实践 1. 方案选型 2. 状态判断及日志采集 3. 监控告警 4. 共享存储 未来规划 理想汽车数据集成的发展经历了四个阶段:

    2024年02月01日
    浏览(47)
  • Flink:处理大规模复杂数据集的最佳实践深入探究Flink的数据处理和性能优化技术

    作者:禅与计算机程序设计艺术 随着互联网、移动互联网、物联网等新型网络技术的不断发展,企业对海量数据的处理日益依赖,而大数据分析、决策支持、风险控制等领域都需要海量的数据处理能力。如何高效、快速地处理海量数据、提升处理效率、降低成本,是当下处理

    2024年02月13日
    浏览(59)
  • 【大数据】Flink 从入门到实践(一):初步介绍

    Apache Flink 是一个框架和分布式处理引擎,用于在 无边界 和 有边界 数据流上进行 有状态 的计算。Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。 1.1 处理无界和有界数据 任何类型的数据都可以形成一种事件流。信用卡交易、传感器测量、机器日志

    2024年02月14日
    浏览(43)
  • Flink + Paimon数据 CDC 入湖最佳实践

    Apache Paimon 最典型的场景是解决了 CDC (Change Data Capture)数据的入湖,看完这篇文章可以了解到: 1、为什么 CDC 入Hive迁移到 Paimon? 2、CDC 入 Paimon 怎么样做到成本最低? 3、Paimon 对比 Hudi有什么样的优势?  Paimon 从 CDC 入湖场景出发,希望提供给你 简单、低成本、低延时 的

    2024年01月16日
    浏览(47)
  • 尚硅谷大数据Flink1.17实战教程-笔记02【Flink部署】

    尚硅谷大数据技术-教程-学习路线-笔记汇总表【课程资料下载】 视频地址:尚硅谷大数据Flink1.17实战教程从入门到精通_哔哩哔哩_bilibili 尚硅谷大数据Flink1.17实战教程-笔记01【Flink概述、Flink快速上手】 尚硅谷大数据Flink1.17实战教程-笔记02【Flink部署】 尚硅谷大数据Flink1.17实

    2024年02月11日
    浏览(41)
  • Flink 数据集成服务在小红书的降本增效实践

    摘要:本文整理自实时引擎研发工程师袁奎,在 Flink Forward Asia 2022 数据集成专场的分享。本篇内容主要分为四个部分: 小红书实时服务降本增效背景 Flink 与在离线混部实践 实践过程中遇到的问题及解决方案 未来展望 点击查看原文视频 演讲PPT 1.1 小红书 Flink 使用场景特点

    2024年02月11日
    浏览(37)
  • Flink与Spring Boot集成实践:搭建实时数据处理平台

    在当今数据风暴的时代,实时数据处理已经成为众多企业关注的热点。Apache Flink作为一个高性能、可扩展的实时计算框架,在实时数据处理领域占据着举足轻重的地位。Spring Boot则以其快速开发、简化配置而广受欢迎,将两者结合,我们可以快速地搭建起一个实时数据处理平

    2024年04月27日
    浏览(58)
  • Flink Catalog 解读与同步 Hudi 表元数据的最佳实践

    博主历时三年精心创作的《大数据平台架构与原型实现:数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行,点击《重磅推荐:建大数据平台太难了!给我发个工程原型吧!》了解图书详情,京东购书链接:https://item.jd.com/12677623.html,扫描左侧

    2024年02月22日
    浏览(47)
  • 大数据Flink(五十八):Flink on Yarn的三种部署方式介绍

    文章目录 Flink on Yarn的三种部署方式介绍 一、​​​​​​​Session模式

    2024年02月13日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包