Flink高手之路2-Flink集群的搭建

这篇具有很好参考价值的文章主要介绍了Flink高手之路2-Flink集群的搭建。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


flink集群,Flink高手之路,flink,大数据,spark

Flink高手之路2-Flink集群的搭建

一、Flink的安装模式

1.本地local模式

本地单机模式,一般用于测试环境是否搭建成功,很少使用

2.独立集群模式standalone

Flink自带集群,开发测试使用

3.高可用的独立集群模式standalone HA

Flink自带集群,用于开发测试

4.基于yarn模式Flink on yarn

计算资源统一交给hadoop的yarn进行管理,用于生产环境

二、基础环境

  • 虚拟机
  • jdk1.8
  • ssh免密登录

三、Flink的local模式安装

1. 下载安装包

flink集群,Flink高手之路,flink,大数据,spark

点击:

flink集群,Flink高手之路,flink,大数据,spark

点击下载:

flink集群,Flink高手之路,flink,大数据,spark

2. 上传服务器

找到安装包,并上传:

flink集群,Flink高手之路,flink,大数据,spark

上传成功:

flink集群,Flink高手之路,flink,大数据,spark

3.解压

tar xzvf flink-1.16.1-bin-scala_2.12.tgz -C /export/servers/

flink集群,Flink高手之路,flink,大数据,spark

进入 Servers 目录下:

flink集群,Flink高手之路,flink,大数据,spark

进入 Flink 目录下:

flink集群,Flink高手之路,flink,大数据,spark

进入 bin 目录下:

flink集群,Flink高手之路,flink,大数据,spark

4. 配置环境变量

flink集群,Flink高手之路,flink,大数据,spark

5. 使环境变量起作用

flink集群,Flink高手之路,flink,大数据,spark

6.测试显示版本

flink集群,Flink高手之路,flink,大数据,spark

7.测试scala shell交互命令行(可跳过)

需要flink的版本是1.12及以下的版本,在高版本中 scala shell 被舍去了。

1)安装一下 Flink 1.12 版本

上传文件

flink集群,Flink高手之路,flink,大数据,spark

上传成功:

flink集群,Flink高手之路,flink,大数据,spark

解压

flink集群,Flink高手之路,flink,大数据,spark

flink集群,Flink高手之路,flink,大数据,spark

2)启动命令行

启动 shell

bin/start-scala-shell.sh local

flink集群,Flink高手之路,flink,大数据,spark

flink集群,Flink高手之路,flink,大数据,spark

3)web ui查看

flink集群,Flink高手之路,flink,大数据,spark

4)scala命令行示例-单词计数(批处理)
  • 准备好数据文件

flink集群,Flink高手之路,flink,大数据,spark

benv.readTextFile("/root/a.txt").flatMap(_.split(" ")).map((_,1)).groupBy(0).sum(1).print()

flink集群,Flink高手之路,flink,大数据,spark

5)scala命令行示例2-窗口计数(流处理)

flink集群,Flink高手之路,flink,大数据,spark

6)退出命令行

输入 :quit 或者 Ctrl + d

flink集群,Flink高手之路,flink,大数据,spark

8.local模式测试

启动集群并查看进程

flink集群,Flink高手之路,flink,大数据,spark

9.查看Flink的web ui

启动失败,需要修改/etc/hosts文件,添加localhost的定义

flink集群,Flink高手之路,flink,大数据,spark

若直接添加 192.168.92.128 localhost在启动 Hbase时会出现如下错误

flink集群,Flink高手之路,flink,大数据,spark

修改完成后,启动成功:
flink集群,Flink高手之路,flink,大数据,spark

10.local集群运行测试任务-单词计数

1)先准备好数据文件

flink集群,Flink高手之路,flink,大数据,spark

2)找到单词计数的jar包

flink集群,Flink高手之路,flink,大数据,spark

3)提交任务到集群上运行

出现错误:org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.

原因:没有启动Flink集群

启动集群:

flink集群,Flink高手之路,flink,大数据,spark

运行成功:

flink集群,Flink高手之路,flink,大数据,spark

执行成功后,在/root目录下出现 output 目录

flink集群,Flink高手之路,flink,大数据,spark

运行结果

flink集群,Flink高手之路,flink,大数据,spark

4)web ui任务执行过程查看

flink集群,Flink高手之路,flink,大数据,spark

点击任务

flink集群,Flink高手之路,flink,大数据,spark

11.Flink本地(local)模式任务执行的原理

Flink程序提交任务到 JobClient ,JobClient 提交任务到 JobManager【Master】,JobManager 分发任务给TaskManager,TaskManager执行任务,执行任务后发送状态给 JobManager,JobManager 将结果返回到 JobClient 。

flink集群,Flink高手之路,flink,大数据,spark

四、Flink的独立集群Standalone模式的安装及测试

1.集群规划

服务器 JobManager TaskManager
hadoop001
hadoop002
hadoop003

2.下载安装包并上传服务器解压

同上

3.配置环境变量并使环境变量起作用

同上

4.修改Flink的配置文件

flink集群,Flink高手之路,flink,大数据,spark

1)修改yaml或者yml文件的注意事项
  • 不同的等级用冒号隔开,同时缩进格式
  • 次等级的前面是空格,不能使用制表符
  • 冒号之后如果有值,那么冒号与值之间用至少一个空格分隔,不能紧贴在一起

flink集群,Flink高手之路,flink,大数据,spark

2)修改flink-conf.yaml
  • flink1.16版本的配置
################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################


#==============================================================================
# Common
#==============================================================================

# The external address of the host on which the JobManager runs and can be
# reached by the TaskManagers and any clients which want to connect. This setting
# is only used in Standalone mode and may be overwritten on the JobManager side
# by specifying the --host <hostname> parameter of the bin/jobmanager.sh executable.
# In high availability mode, if you use the bin/start-cluster.sh script and setup
# the conf/masters file, this will be taken care of automatically. Yarn
# automatically configure the host name based on the hostname of the node where the
# JobManager runs.

jobmanager.rpc.address: hadoop001

# The RPC port where the JobManager is reachable.

jobmanager.rpc.port: 6123

# The host interface the JobManager will bind to. By default, this is localhost, and will prevent
# the JobManager from communicating outside the machine/container it is running on.
# On YARN this setting will be ignored if it is set to 'localhost', defaulting to 0.0.0.0.
# On Kubernetes this setting will be ignored, defaulting to 0.0.0.0.
#
# To enable this, set the bind-host address to one that has access to an outside facing network
# interface, such as 0.0.0.0.

jobmanager.bind-host: 0.0.0.0


# The total process memory size for the JobManager.
#
# Note this accounts for all memory usage within the JobManager process, including JVM metaspace and other overhead.

jobmanager.memory.process.size: 1600m

# The host interface the TaskManager will bind to. By default, this is localhost, and will prevent
# the TaskManager from communicating outside the machine/container it is running on.
# On YARN this setting will be ignored if it is set to 'localhost', defaulting to 0.0.0.0.
# On Kubernetes this setting will be ignored, defaulting to 0.0.0.0.
#
# To enable this, set the bind-host address to one that has access to an outside facing network
# interface, such as 0.0.0.0.

taskmanager.bind-host: 0.0.0.0

# The address of the host on which the TaskManager runs and can be reached by the JobManager and
# other TaskManagers. If not specified, the TaskManager will try different strategies to identify
# the address.
#
# Note this address needs to be reachable by the JobManager and forward traffic to one of
# the interfaces the TaskManager is bound to (see 'taskmanager.bind-host').
#
# Note also that unless all TaskManagers are running on the same machine, this address needs to be
# configured separately for each TaskManager.

taskmanager.host: hadoop001

# The total process memory size for the TaskManager.
#
# Note this accounts for all memory usage within the TaskManager process, including JVM metaspace and other overhead.

taskmanager.memory.process.size: 1728m

# To exclude JVM metaspace and overhead, please, use total Flink memory size instead of 'taskmanager.memory.process.size'.
# It is not recommended to set both 'taskmanager.memory.process.size' and Flink memory.
#
# taskmanager.memory.flink.size: 1280m

# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.

taskmanager.numberOfTaskSlots: 2

# The parallelism used for programs that did not specify and other parallelism.

parallelism.default: 2

# The default file system scheme and authority.
# 
# By default file paths without scheme are interpreted relative to the local
# root file system 'file:///'. Use this to override the default and interpret
# relative paths relative to a different file system,
# for example 'hdfs://mynamenode:12345'
#
# fs.default-scheme

#==============================================================================
# High Availability
#==============================================================================

# The high-availability mode. Possible options are 'NONE' or 'zookeeper'.
#
# high-availability: zookeeper

# The path where metadata for master recovery is persisted. While ZooKeeper stores
# the small ground truth for checkpoint and leader election, this location stores
# the larger objects, like persisted dataflow graphs.
# 
# Must be a durable file system that is accessible from all nodes
# (like HDFS, S3, Ceph, nfs, ...) 
#
# high-availability.storageDir: hdfs:///flink/ha/

# The list of ZooKeeper quorum peers that coordinate the high-availability
# setup. This must be a list of the form:
# "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
#
# high-availability.zookeeper.quorum: localhost:2181


# ACL options are based on https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes
# It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open" (ZOO_OPEN_ACL_UNSAFE)
# The default value is "open" and it can be changed to "creator" if ZK security is enabled
#
# high-availability.zookeeper.client.acl: open

#==============================================================================
# Fault tolerance and checkpointing
#==============================================================================

# The backend that will be used to store operator state checkpoints if
# checkpointing is enabled. Checkpointing is enabled when execution.checkpointing.interval > 0.
#
# Execution checkpointing related parameters. Please refer to CheckpointConfig and ExecutionCheckpointingOptions for more details.
#
# execution.checkpointing.interval: 3min
# execution.checkpointing.externalized-checkpoint-retention: [DELETE_ON_CANCELLATION, RETAIN_ON_CANCELLATION]
# execution.checkpointing.max-concurrent-checkpoints: 1
# execution.checkpointing.min-pause: 0
# execution.checkpointing.mode: [EXACTLY_ONCE, AT_LEAST_ONCE]
# execution.checkpointing.timeout: 10min
# execution.checkpointing.tolerable-failed-checkpoints: 0
# execution.checkpointing.unaligned: false
#
# Supported backends are 'hashmap', 'rocksdb', or the
# <class-name-of-factory>.
#
# state.backend: hashmap

# Directory for checkpoints filesystem, when using any of the default bundled
# state backends.
#
# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints

# Default target directory for savepoints, optional.
#
# state.savepoints.dir: hdfs://namenode-host:port/flink-savepoints

# Flag to enable/disable incremental checkpoints for backends that
# support incremental checkpoints (like the RocksDB state backend). 
#
# state.backend.incremental: false

# The failover strategy, i.e., how the job computation recovers from task failures.
# Only restart tasks that may have been affected by the task failure, which typically includes
# downstream tasks and potentially upstream tasks if their produced data is no longer available for consumption.

jobmanager.execution.failover-strategy: region

#==============================================================================
# Rest & web frontend
#==============================================================================

# The port to which the REST client connects to. If rest.bind-port has
# not been specified, then the server will bind to this port as well.
#
rest.port: 8081

# The address to which the REST client will connect to
#
rest.address: hadoop001

# Port range for the REST and web server to bind to.
#
#rest.bind-port: 8080-8090

# The address that the REST & web server binds to
# By default, this is localhost, which prevents the REST & web server from
# being able to communicate outside of the machine/container it is running on.
#
# To enable this, set the bind address to one that has access to outside-facing
# network interface, such as 0.0.0.0.
#
rest.bind-address: 0.0.0.0

# Flag to specify whether job submission is enabled from the web-based
# runtime monitor. Uncomment to disable.

#web.submit.enable: false

# Flag to specify whether job cancellation is enabled from the web-based
# runtime monitor. Uncomment to disable.

#web.cancel.enable: false

#==============================================================================
# Advanced
#==============================================================================

# Override the directories for temporary files. If not specified, the
# system-specific Java temporary directory (java.io.tmpdir property) is taken.
#
# For framework setups on Yarn, Flink will automatically pick up the
# containers' temp directories without any need for configuration.
#
# Add a delimited list for multiple directories, using the system directory
# delimiter (colon ':' on unix) or a comma, e.g.:
#     /data1/tmp:/data2/tmp:/data3/tmp
#
# Note: Each directory entry is read from and written to by a different I/O
# thread. You can include the same directory multiple times in order to create
# multiple I/O threads against that directory. This is for example relevant for
# high-throughput RAIDs.
#
# io.tmp.dirs: /tmp

# The classloading resolve order. Possible values are 'child-first' (Flink's default)
# and 'parent-first' (Java's default).
#
# Child first classloading allows users to use different dependency/library
# versions in their application than those in the classpath. Switching back
# to 'parent-first' may help with debugging dependency issues.
#
# classloader.resolve-order: child-first

# The amount of memory going to the network stack. These numbers usually need 
# no tuning. Adjusting them may be necessary in case of an "Insufficient number
# of network buffers" error. The default min is 64MB, the default max is 1GB.
# 
# taskmanager.memory.network.fraction: 0.1
# taskmanager.memory.network.min: 64mb
# taskmanager.memory.network.max: 1gb

#==============================================================================
# Flink Cluster Security Configuration
#==============================================================================

# Kerberos authentication for various components - Hadoop, ZooKeeper, and connectors -
# may be enabled in four steps:
# 1. configure the local krb5.conf file
# 2. provide Kerberos credentials (either a keytab or a ticket cache w/ kinit)
# 3. make the credentials available to various JAAS login contexts
# 4. configure the connector to use JAAS/SASL

# The below configure how Kerberos credentials are provided. A keytab will be used instead of
# a ticket cache if the keytab path and principal are set.

# security.kerberos.login.use-ticket-cache: true
# security.kerberos.login.keytab: /path/to/kerberos/keytab
# security.kerberos.login.principal: flink-user

# The configuration below defines which JAAS login contexts

# security.kerberos.login.contexts: Client,KafkaClient

#==============================================================================
# ZK Security Configuration
#==============================================================================

# Below configurations are applicable if ZK ensemble is configured for security

# Override below configuration to provide custom ZK service name if configured
# zookeeper.sasl.service-name: zookeeper

# The configuration below must match one of the values set in "security.kerberos.login.contexts"
# zookeeper.sasl.login-context-name: Client

#==============================================================================
# HistoryServer
#==============================================================================

# The HistoryServer is started and stopped via bin/historyserver.sh (start|stop)

# Directory to upload completed jobs to. Add this directory to the list of
# monitored directories of the HistoryServer as well (see below).
#jobmanager.archive.fs.dir: hdfs:///completed-jobs/

# The address under which the web-based HistoryServer listens.
#historyserver.web.address: 0.0.0.0

# The port under which the web-based HistoryServer listens.
#historyserver.web.port: 8082

# Comma separated list of directories to monitor for completed jobs.
#historyserver.archive.fs.dir: hdfs:///completed-jobs/

# Interval in milliseconds for refreshing the monitored directories.
#historyserver.archive.fs.refresh-interval: 10000


  • Flink1.12版本的配置

flink集群,Flink高手之路,flink,大数据,spark

3)master

flink集群,Flink高手之路,flink,大数据,spark

4)workers

flink集群,Flink高手之路,flink,大数据,spark

5.分发文件

1)分发flink

flink集群,Flink高手之路,flink,大数据,spark

flink集群,Flink高手之路,flink,大数据,spark

2)分发/etc/profile

flink集群,Flink高手之路,flink,大数据,spark

3)使得配置文件起作用

flink集群,Flink高手之路,flink,大数据,spark

6.启动Flink集群,并查看相关进程

flink集群,Flink高手之路,flink,大数据,spark

flink集群,Flink高手之路,flink,大数据,spark

flink集群,Flink高手之路,flink,大数据,spark

7.web ui查看

flink集群,Flink高手之路,flink,大数据,spark

8.集群测试

1)提交单词计数的任务,使用默认的参数

flink集群,Flink高手之路,flink,大数据,spark

flink集群,Flink高手之路,flink,大数据,spark

flink集群,Flink高手之路,flink,大数据,spark

2)提交单词计数的任务,使用自定义参数

准备好数据文件

flink集群,Flink高手之路,flink,大数据,spark

上传hdfs

首先要确保 hdfs 集群已经启动

flink集群,Flink高手之路,flink,大数据,spark

发现我们以前已经上传过了

flink集群,Flink高手之路,flink,大数据,spark

提交命令

flink run ./WordCount.jar --input hdfs://hadoop001:9000/input --output hdfs://hadoop001:9000/output

flink集群,Flink高手之路,flink,大数据,spark

出现错误:

org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.

这个错误需要把flink-1.16.1与hadoop3进行集成。

flink集群,Flink高手之路,flink,大数据,spark

3)添加hadoop classpath配置
export HADOOP_CLASSPATH=`hadoop classpath`

flink集群,Flink高手之路,flink,大数据,spark

4)分发并激活环境变量

flink集群,Flink高手之路,flink,大数据,spark

flink集群,Flink高手之路,flink,大数据,spark

5)下载flink和hadoop的连接工具,上传到flink的lib文件夹

去maven中央仓库下载如下jar包并上传到 flink/lib文件夹中

https://mvnrepository.com/artifact/commons-cli/commons-cli/1.5.0

https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-3-uber

这是为了集成hadoop,而shaded依赖已经解决了相关的jar包冲突等问题,该jar包属于第三方jar包,官网有链接,但是并没有hadoop 3.X的,这个直接在maven中央仓库搜索倒是可以搜得到。

flink集群,Flink高手之路,flink,大数据,spark

上传 jar 包到lib目录下

flink集群,Flink高手之路,flink,大数据,spark

分发 lib 目录到hadoop002和hadoop003

flink集群,Flink高手之路,flink,大数据,spark

6)重新启动flink集群

flink集群,Flink高手之路,flink,大数据,spark

7)重新提交单词计数的任务,使用自定义参数

flink集群,Flink高手之路,flink,大数据,spark

查看 flink web ui

flink集群,Flink高手之路,flink,大数据,spark

查看 hdfs web UI

flink集群,Flink高手之路,flink,大数据,spark

点击一个文件查看

flink集群,Flink高手之路,flink,大数据,spark

9.工作原理

flink集群,Flink高手之路,flink,大数据,spark

五、独立集群高可用Standalone-HA搭建

1.集群规划

服务器 JobManager TaskManager
hadoop001 y y
hadoop002 y y
hadoop003 n y

2.修改flink的配置文件

1)修改flink-conf.yaml文件

flink集群,Flink高手之路,flink,大数据,spark

flink集群,Flink高手之路,flink,大数据,spark

2)修改masters文件

flink集群,Flink高手之路,flink,大数据,spark

3)不用修改workers文件

3.同步配置文件

分发到Hadoop002:

flink集群,Flink高手之路,flink,大数据,spark

分发到Hadoop003:

flink集群,Flink高手之路,flink,大数据,spark

4.修改hadoop002上的flink-conf.yaml文件

flink集群,Flink高手之路,flink,大数据,spark

flink集群,Flink高手之路,flink,大数据,spark

注意:12.7版本下只需要修改一处就可以了,16.1需要修改3处,否则会提交任务失败。

5.启动集群

1)启动zookeeper

启动ZooKeeper,查看ZooKeeper的状态:

flink集群,Flink高手之路,flink,大数据,spark

flink集群,Flink高手之路,flink,大数据,spark

flink集群,Flink高手之路,flink,大数据,spark

2)启动hdfs
3)启动yarn

flink集群,Flink高手之路,flink,大数据,spark

4)启动flink集群

flink集群,Flink高手之路,flink,大数据,spark

flink集群,Flink高手之路,flink,大数据,spark

flink集群,Flink高手之路,flink,大数据,spark

6.flink的web ui查看

flink集群,Flink高手之路,flink,大数据,spark

flink集群,Flink高手之路,flink,大数据,spark

7.集群的测试

1)单词计数使用默认的参数

flink集群,Flink高手之路,flink,大数据,spark

2)杀掉hadoop001的master进程

flink集群,Flink高手之路,flink,大数据,spark

此时查看web ui,hadoop001无法访问,hadoop002还可以继续访问
flink集群,Flink高手之路,flink,大数据,spark

flink集群,Flink高手之路,flink,大数据,spark

3)再次提交单词计数的任务(使用默认参数)

flink集群,Flink高手之路,flink,大数据,spark

集群能正常工作,说明高可用在起作用

4)接着杀掉hadoop002的master

flink集群,Flink高手之路,flink,大数据,spark

此时,node2的web ui也无法访问
flink集群,Flink高手之路,flink,大数据,spark

再次提交任务,出现错误,无法运行任务

flink集群,Flink高手之路,flink,大数据,spark

5)单词计数,使用自定义参数

重启集群

flink集群,Flink高手之路,flink,大数据,spark

删除hdfs上以前创建的output文件夹

flink集群,Flink高手之路,flink,大数据,spark

提交任务,使用之前上传的数据

flink run examples/batch/WordCount.jar --input hdfs://hadoop001:9000/input --output hdfs://hadoop001:9000/output

flink集群,Flink高手之路,flink,大数据,spark

查看结果

flink集群,Flink高手之路,flink,大数据,spark

杀掉hadoop001的master进程,并再次提交任务

flink集群,Flink高手之路,flink,大数据,spark

flink集群,Flink高手之路,flink,大数据,spark

再次删除hdfs上之前创建的output文件夹

flink集群,Flink高手之路,flink,大数据,spark

再次提交任务,可以正常运行并查看结果,说明高可用搭建成功

flink集群,Flink高手之路,flink,大数据,spark

flink集群,Flink高手之路,flink,大数据,spark

8.工作原理

flink集群,Flink高手之路,flink,大数据,spark

六、Flink on Yarn模式集群搭建及测试

1.为什么要使用Flink on Yarn

  • yarn管理资源,可以按需使用,提高整个集群的资源利用率
  • 任务有优先级,可以根据优先级合理的安排任务运行作用
  • 基于yarn的调度系统,能够自动化的处理各个角色的容错

2.集群规划

跟standalone保持一致

服务器 JobManager TaskManager
hadoop001 y y
hadoop002 y y
hadoop003 n y

3.修改yarn的配置

flink集群,Flink高手之路,flink,大数据,spark

4.启动相关的服务

  • zookeeper
  • hdfs
  • yarn
  • flink
  • historyserver(可选)

flink集群,Flink高手之路,flink,大数据,spark

启动历史服务器

flink集群,Flink高手之路,flink,大数据,spark

5.flink on yarn提交任务的模式

有两种模式

  • session模式 :会话模式
  • per-job模式:每任务模式

flink集群,Flink高手之路,flink,大数据,spark

6.Session模式提交任务

1)开启会话(session)

flink集群,Flink高手之路,flink,大数据,spark

语法:

yarn-session.sh -n 2 -tm 800 -s 1 -d

说明:

  • n:表示申请容器的数量,也就是worker的数量,也就是cpu的核心数
  • tm:表示给个worker(TaskManager)的内存大小
  • s:表示每个worker的slot的数量
  • d:表示后台运行

启动一个会话

yarn-session.sh -n 2 -tm 800 -s 1 -d

flink集群,Flink高手之路,flink,大数据,spark

此时的进程

flink集群,Flink高手之路,flink,大数据,spark

web ui的查看

flink集群,Flink高手之路,flink,大数据,spark

flink集群,Flink高手之路,flink,大数据,spark

flink集群,Flink高手之路,flink,大数据,spark

flink集群,Flink高手之路,flink,大数据,spark
flink集群,Flink高手之路,flink,大数据,spark

2)提交任务-单词计数

使用的默认的参数,提交任务
flink集群,Flink高手之路,flink,大数据,spark

查看yarn的web ui

flink集群,Flink高手之路,flink,大数据,spark

flink集群,Flink高手之路,flink,大数据,spark

flink集群,Flink高手之路,flink,大数据,spark

3)再次提交任务

flink集群,Flink高手之路,flink,大数据,spark

再次查看yarn的web ui

flink集群,Flink高手之路,flink,大数据,spark

7.关闭yarn-session

flink集群,Flink高手之路,flink,大数据,spark

关闭会话
flink集群,Flink高手之路,flink,大数据,spark

查看进程
flink集群,Flink高手之路,flink,大数据,spark

查看yarn的web ui

flink集群,Flink高手之路,flink,大数据,spark

8.Per-Job模式提交任务

1)语法
flink run -m yarn-cluster -yjm 1024 -ytm 1024 examples/batch/WordCount.jar 

说明:

  • m:jobmanager的地址
  • yjm:jobmanager的内存大小
  • ytm:taskmanager的内存大小
2)提交任务

flink集群,Flink高手之路,flink,大数据,spark

3)查看yarn的web ui

flink集群,Flink高手之路,flink,大数据,spark

执行过程中出现错误
flink集群,Flink高手之路,flink,大数据,spark

解决错误,可以修改flink的配置

flink集群,Flink高手之路,flink,大数据,spark

分发配置文件,并重启flink

4)再次提交任务

flink集群,Flink高手之路,flink,大数据,spark

flink集群,Flink高手之路,flink,大数据,spark

5)查看jps,并没有相关的进程,也就是当任务执行完成后,进程自动关闭

flink集群,Flink高手之路,flink,大数据,spark

9.flink任务提交参数总结

flink集群,Flink高手之路,flink,大数据,spark

flink集群,Flink高手之路,flink,大数据,spark

flink集群,Flink高手之路,flink,大数据,spark

flink集群,Flink高手之路,flink,大数据,spark

flink集群,Flink高手之路,flink,大数据,spark

flink集群,Flink高手之路,flink,大数据,spark

参考文章:

flink启动后web访问问题

Flink高手之路:Flink的环境搭建

org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:Hadoop is not in the classpath/dependencies

flink 1.15.2集群搭建(Flink Standalone模式)文章来源地址https://www.toymoban.com/news/detail-753310.html

到了这里,关于Flink高手之路2-Flink集群的搭建的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink集群搭建

    ⚠申明: 未经许可,禁止以任何形式转载,若要引用,请标注链接地址。 全文共计3696字,阅读大概需要3分钟 🌈更多学习内容, 欢迎👏关注👀【文末】我的个人微信公众号:不懂开发的程序猿 个人网站:https://jerry-jy.co/ 掌握Flink集群搭建的过程。 掌握Flink集群的启动、停

    2024年02月08日
    浏览(36)
  • Flink 1.17.0集群搭建

    集群角色分配 Hostname IP Role hadoop01 192.168.126.132 JobManager TaskManager hadoop02 192.168.126.133 TaskManager hadoop03 192.168.126.134 TaskManager 下载flink安装包 https://archive.apache.org/dist/flink/flink-1.17.0/  上传至hadoop01并解压:  修改conf/flink-conf.yaml(从flink1.16版本开始,需要修改以下配置) 修改conf/

    2024年02月12日
    浏览(35)
  • Linux多虚拟机集群化配置详解(Zookeeper集群、Kafka集群、Hadoop集群、HBase集群、Spark集群、Flink集群、Zabbix、Grafana部署)

    前面安装的软件,都是以单机模式运行的,学习大数据相关的软件部署,后续安装软件服务,大多数都是以集群化(多台服务器共同工作)模式运行的。所以,需要完成集群化环境的前置准备,包括创建多台虚拟机,配置主机名映射,SSH免密登录等等。 我们可以使用VMware提供

    2024年02月04日
    浏览(52)
  • Flink 本地单机/Standalone集群/YARN模式集群搭建

    本文简述 Flink 在 Linux 中安装步骤,和示例程序的运行。需要安装 JDK1.8 及以上版本。 下载地址:下载 Flink 的二进制包 点进去后,选择如下链接: 解压 flink-1.10.1-bin-scala_2.12.tgz ,我这里解压到 soft 目录 解压后进入 Flink 的 bin 目录执行如下脚本即可 进入 Flink 页面看看,如果

    2024年02月05日
    浏览(45)
  • 基于Hadoop搭建Flink集群详细步骤

    目录 1.xftp上传flink压缩包至hadoop102的/opt/software/目录下 2.解压flink压缩包至/opt/module/目录下 3. 配置flink-conf.yaml 4.配置masters 5.配置workers 6.配置环境变量my_env.sh 7.重启环境变量 8.分发/opt/module/flink-1.13.0和/etc/profile.d/my_env.sh 9.另外两台重启环境变量 10.开启hadoop集群和flink集群 11.浏

    2024年02月09日
    浏览(69)
  • Flink三种模式介绍&集群的搭建

    目录 Flink是什么  Flink部署模式 会话模式(Session Mode ) 单作业模式(Per-Job Mode) 应用模式(Application Mode) Flink集群搭建 Standalone运行模式  会话模式 单作业模式部署 应用模式部署 YARN运行模式  会话模式部署 单作业模式部署 应用模式部署 Flink是“数据流上的有状态计算”

    2024年02月09日
    浏览(37)
  • 206.Flink(一):flink概述,flink集群搭建,flink中执行任务,单节点、yarn运行模式,三种部署模式的具体实现

    Flink官网地址:Apache Flink® — Stateful Computations over Data Streams | Apache Flink Flink是一个 框架 和 分布式处理引擎 ,用于对 无界 和 有界 数据流进行 有状态计算 。 无界流(流): 有定义流的开始,没有定义结束。会无休止产生数据 无界流数据必须持续处理 有界流(批): 有定

    2024年02月11日
    浏览(49)
  • 搭建单机版K8S运行Flink集群

    环境要求 操作系统: CentOS 7.x 64位 Kubernetes版本:v1.16.2 Docker版本:19.03.13-ce Flink版本:1.14.3 使用中国YUM及镜像源  1.安装Kubernetes: 1.1 创建文件:/etc/yum.repos.d/kubernetes.repo,内容如下: 1.2  执行安装命令:  1.3 启动kubelet服务并设置开机自启: 2.安装Docker: 2.1 创建文件:

    2023年04月26日
    浏览(48)
  • Hive & Spark & Flink 数据倾斜

    绝大部分任务都很快完成,只有一个或者少数几个任务执行的很慢甚至最终执行失败, 这样的现象为数据倾斜现象。 任务进度长时间维持在 99%或者 100%的附近,查看任务监控页面,发现只有少量 reduce 子任务未完成,因为其处理的数据量和其他的 reduce 差异过大。 单一 redu

    2024年02月07日
    浏览(41)
  • k8s 搭建基于session模式的flink集群

    不废话直接上代码,都是基于官网的,在此记录一下 Kubernetes | Apache Flink flink-configuration-configmap.yaml jobmanager-service.yaml  Optional service, which is only necessary for non-HA mode. Session cluster resource definitions # jobmanager-session-deployment-non-ha.yaml taskmanager-session-deployment.yaml  kubectl apply -f xxx.ya

    2024年02月10日
    浏览(36)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包