Flink流批一体计算(5):部署运行模式

这篇具有很好参考价值的文章主要介绍了Flink流批一体计算(5):部署运行模式。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

集群运行模式

1.local模式

2.standalone模式

3.Flink on YARN模式

本地模式

Standalone 模式

Flink on Yarn 模式


集群运行模式

类似于SparkFlink也有各种运行模式,其中主要支持三种:local模式、standalone模式以及Flink on YARN模式。

每种模式都有特定的使用场景,接下来一起了解一下各种运行模式。

1.local模式

适用于测试调试。Flink可以运行在LinuxmacOSWindows系统上。local模式的安装唯一需要的是Java 1.7.x或更高版本,运行时会启动JVM,主要用于调试代码,一台服务器即可运行。

2.standalone模式

适用于Flink自主管理资源。Flink自带集群模式standalone,主要是将资源调度管理交给Flink集群自己来处理。standalone是一种集群模式,可以有一个或者多个主节点(JobManagerHA模式,用于资源管理调度、任务管理、任务划分等工作),多个从节点(TaskManager,主要用于执行JobManager分解出来的任务)。

3.Flink on YARN模式

使用YARN来统一调度和管理资源,一般在学习研究环节或资源不充足的情况下,采用local模式部署即可,生产环境中Flink on YARN模式比较常见。

Flink on YARN任务提交的工作流程见下一节。

本地模式

Flink的local模式部署安装

在local模式下,不需要启动任何的进程,仅仅是使用本地线程来模拟flink的进程,适用于测试开发调试等,这种模式下,不用更改任何配置,只需要保证jdk8安装正常即可。

前提条件:

Java 1.8+

部署步骤:

1.下载安装包并解压,下载版本较新且稳定的版本:

# wget https://archive.apache.org/dist/flink/flink-1.16.0/flink-1.16.0-bin-scala_2.12.tgz

进行解压

# tar -zxf flink-1.16.0-bin-scala_2.12.tgz

2.直接使用脚本启动

flink在处于local模式下,不需要更改任何配置,直接解压之后启动即可

执行以下命令直接启动local模式

cd /data-ext/flink-1.16.0
bin/start-cluster.sh

关闭local模式

cd /data-ext/flink-1.16.0
bin/stop-cluster.sh

3.启动成功之后进行检查

执行jps就能查看到启动了两个进程

# jps

23792 TaskManagerRunner

23514 StandaloneSessionClusterEntrypoint

webUI界面访问

启动两个进程成功之后,访问8081端口号即可访问到flink的web管理界

http://master:8081/#/overview

4. 运行flink自带的测试

master使用linux的nc命令来向socket当中发送一些单词。

nc是netcat的简写,是一个功能强大的网络工具,有着网络界的瑞士军刀美誉。nc命令在linux系统中实际命令是ncat,nc是软连接到ncat。

# sudo yum -y install nc
# nc -lk 8000

打开master另一个窗口,启动flink的自带的单词统计程序,接受输入的socket数据并进行统计

cd /data-ext/flink-1.16.0
bin/flink run examples/streaming/SocketWindowWordCount.jar   --hostname localhost  --port 8000

查看统计结果:

flink自带的测试用例统计结果在log文件夹下面

master执行以下命令查看统计结果

cd /data-ext/flink-1.16.0/log
tail -200f flink-root-taskexecutor-0-VM-0-9-centos.out

Standalone 模式

Standalone 模式是集群模式的一种,但是这种模式一般并不运行在生产环境中,原因和 on yarn 模式相比:

Standalone 模式的部署相对简单,可以支持小规模,少量的任务运行;

Stabdalone 模式缺少系统层面对集群中 Job 的管理,容易遭成资源分配不均匀;

资源隔离相对简单,任务之间资源竞争严重。

前提条件:

准备两台服务器,一台用来管理任务(JobManager),一台用来执行任务(TaskManager )

管理任务的服务器一台即可,执行任务的服务器后续可根据实际需求无限制扩充节点

每台服务器安装java 1.8,且设置JAVA_HOME

实现两台服务器之间ssh免密登录

服务器列表:

NAME

IP

OS-IMAGE

Java

master

192.168.0.220

el7.x86_64

1.8.0_291

node01

192.168.0.6

el7.x86_64

1.8.0_291

node02

192.168.0.8

el7.x86_64

1.8.0_291

部署步骤

1.解压1.16.0版本的flink文件

2.配置系统环境变量

# vim /etc/profile

export FLINK_HOME=/data-ext/flink-1.16.0

export PATH=$PATH:$FLINK_HOME/bin

刷新系统环境变量,使之生效

# source /etc/profile

3. 编辑conf文件

输入指令 cd flink-1.16.0/conf/  进入conf目录

输入指令 vim flink-conf.yaml 编辑conf文件,这个文件是核心配置文件

jobmanager.rpc.address

配置jobmanager rpc 地址

选择一个节点作为master节点(JobManager),设置jobmanager.rpc.address 配置项为该节点的IP或者主机名。

确保所有节点有有一样的jobmanager.rpc.address 配置。

  • 修改taskmanager内存大小

taskmanager.memory.process.size: 2048m

taskmanager.numberOfTaskSlots

修改taskmanager的taskslot个数,Flink中每台服务器的卡槽可以在conf文件中配置,默认是1

我们修改为2 ,如果此值大于1,TaskManager可以使用多个CPU内核,单个TaskManager会将获取函数或运算符并行运行。

修改并行度

parallelism.default: 4

4.配置master

vim masters

master:8081

5. 编辑workers

输入 vim workers 编辑该文件,这个文件用来配置flink集群子节点,默认localhost

与HDFS配置类似,编辑文件conf/slaves并输入每个工作节点的IP/主机名,多个节点就换行写入多个。每个工作节点稍后将运行TaskManager。

master负担重的话,依然可以选择master不作为TaskManager节点(去掉localhost)。

# vim workers

Node01

Node02

5. 分发配置文件

通过 scp 将配置文件分发到子服务器即可

6. 服务启动与停止

启动集群:

bin/start-cluster.sh

jps查看进程

关闭集群:

bin/stop-cluster.sh

7. HA配置

7.1服务器节点设计

主节点

从节点

部署方式

master

node01

Standalone-HA

7.2 配置环境变量

# vim /etc/profile

export HADOOP_HOME=/data-ext/hadoop-3.2.4

export HADOOP_CONF_DIR=$HADOOP_HOME/etc/Hadoop

刷新系统变量环境

# source /etc/profile

7.3 编辑conf/ flink-conf.yaml,配置flink

# vim conf/flink-conf.yaml

7.3.1 配置zoo

新建snapshot存放的目录,在FLINK_HOME目录下执行

# mkdir -p tmp/zookeeper

修改conf下zoo.cfg配置

# vim zoo.cfg

# The directory where the snapshot is stored.

dataDir=/data-ext/flink-1.16.0/tmp/zookeeper

# The port at which the clients will connect

clientPort=2181

# ZooKeeper quorum peers

server.1=master:2888:3888

Flink on Yarn 模式

Flink on Yarn 模式的原理是依靠YARN来调度Flink 任务,目前在企业中使用较多。这种模式的好处是可以充分利用集群资源,提高集群机器的利用率,并且只需要1套Hadoop集群,就可以执行MR和Spark任务,还可以执行 Flink 任务等,操作非常方便,不需要维护多套集群,运维方面也很轻松。Flink on Yarn 模式需要依赖 Hadoop 集群,并且Hadoop的版本需要是 2.2 及以上。

当启动一个新的 Flink YARN Client 会话时,客户端首先会检查所请求的资源(容器和内存)是否可用。之后,它会上传Flink 配置和 JAR 文件到 HDFS。

客 户 端 的 下 一 步 是 请 求 一 个 YARN 容 器 启 动 ApplicationMaster 。 JobManager 和ApplicationMaster(AM)运行在同一个容器中,一旦它们成功地启动了,AM 就能够知道JobManager 的地址,它会为 TaskManager 生成一个新的 Flink 配置文件(这样它才能连上 JobManager),该文件也同样会被上传到 HDFS。另外,AM 容器还提供了 Flink 的Web 界面服务。Flink 用来提供服务的端口是由用户和应用程序 ID 作为偏移配置的,这使得用户能够并行执行多个 YARN 会话。

之后,AM 开始为 Flink 的 TaskManager 分配容器(Container),从 HDFS 下载 JAR 文件和修改过的配置文件。一旦这些步骤完成了,Flink 就安装完成并准备接受任务了。

部署步骤:

1.修改conf/flink-conf.yaml配置,添加如下两项:

#用户提交作业失败时,重新执行次数

yarn.application-attempts: 4

#设置Task在所有节点平均分配

cluster.evenly-spread-out-slots: true

2. 下载hadoop依赖包,并包复制到flink的lib目录下

 flink-shaded-hadoop-3-uber-3.1.1.7.2.1.0-327-9.0.jar

3. 启动测试(session模式)

Session-Cluster:是在 YARN 中提前初始化一个 Flink集群(称为Flink yarn-session),开辟指定的资源,以后的 Flink 任务都提交到这里。这个Flink 集群会常驻在YARN 集群中,除非手工停止。这种方式创建的 Flink 集群会独占资源,不管有没有 Flink 任务在执行,YARN 上面的其他任务都无法使用这些资源。

# 主节点中执行

bin/yarn-session.sh -d -jm 1024 -tm 1024 -s 1

-tm 表示每个 TaskManager 的内存大小

-s 表示每个 TaskManager 的 slots 数量

-d 表示以后台程序方式运行

注意:此时提交的任务都通过该会话(session)执行,不会再申请yarn资源

查看正在运行的任务列表

yarn application -list

4.测试

  1. 创建一个文本文件words.txt

we think we can do it by ourself.

we can not think we can guess ourself.

think think think

we can think, so we can do it.

  1. 将文件上传到hdfs上
# hdfs dfs -copyFromLocal  words.txt  /
  1. 通过session模式,在yarn上提交任务
# bin/flink run examples/batch/WordCount.jar --input hdfs://master:8020/wordcount.txt

5. 关闭session模式,杀掉正在运行的任务文章来源地址https://www.toymoban.com/news/detail-499076.html

yarn application kill

到了这里,关于Flink流批一体计算(5):部署运行模式的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink流批一体计算(3):FLink作业调度

    架构 所有的分布式计算引擎都需要有集群的资源管理器,例如:可以把MapReduce、Spark程序运行在YARN集群中、或者是Mesos中。Flink也是一个分布式计算引擎,要运行Flink程序,也需要一个资源管理器。而学习每一种分布式计算引擎,首先需要搞清楚的就是:我们开发的分布式应用

    2024年02月10日
    浏览(36)
  • Flink流批一体计算(4):Flink功能模块

    目录 Flink功能架构 Flink输入输出 Flink功能架构 Flink是分层架构的分布式计算引擎,每层的实现依赖下层提供的服务,同时提供抽象的接口和服务供上层使用。 Flink 架构可以分为4层,包括Deploy部署层、Core核心层、API层和Library层 部署层:主要涉及Flink的部署模式。Flink支持多种

    2024年02月10日
    浏览(37)
  • 流批一体计算引擎-4-[Flink]消费kafka实时数据

    Python3.6.9 Flink 1.15.2消费Kafaka Topic PyFlink基础应用之kafka 通过PyFlink作业处理Kafka数据 PyFlink需要特定的Python版本,Python 3.6, 3.7, 3.8 or 3.9。 1.3.1 python3和pip3的配置 一、系统中安装了多个版本的python3 。 二、环境变量path作用顺序 三、安装Pyflink 1.3.2 配置Flink Kafka连接 (1)在https://mvnr

    2024年02月06日
    浏览(30)
  • Flink流批一体计算(10):PyFlink Tabel API

    简述 PyFlink 是 Apache Flink 的 Python API ,你可以使用它构建可扩展的批处理和流处理任务,例如实时数据处理管道、大规模探索性数据分析、机器学习( ML )管道和 ETL 处理。 如果你对 Python 和 Pandas 等库已经比较熟悉,那么 PyFlink 可以让你更轻松地利用 Flink 生态系统的全部功

    2024年02月11日
    浏览(31)
  • Flink流批一体计算(16):PyFlink DataStream API

    目录 概述 Pipeline Dataflow 代码示例WorldCount.py 执行脚本WorldCount.py 概述 Apache Flink 提供了 DataStream API,用于构建健壮的、有状态的流式应用程序。它提供了对状态和时间细粒度控制,从而允许实现高级事件驱动系统。 用户实现的Flink程序是由Stream和Transformation这两个基本构建块组

    2024年02月11日
    浏览(38)
  • 流批一体计算引擎-7-[Flink]的DataStream连接器

    参考官方手册DataStream Connectors 一、预定义的Source和Sink 一些比较基本的Source和Sink已经内置在Flink里。 1、预定义data sources支持从文件、目录、socket,以及collections和iterators中读取数据。 2、预定义data sinks支持把数据写入文件、标准输出(stdout)、标准错误输出(stderr)和 sock

    2023年04月08日
    浏览(31)
  • Flink流批一体计算(18):PyFlink DataStream API之计算和Sink

    目录 1. 在上节数据流上执行转换操作,或者使用 sink 将数据写入外部系统。 2. File Sink File Sink Format Types  Row-encoded Formats  Bulk-encoded Formats  桶分配 滚动策略 3. 如何输出结果 Print 集合数据到客户端,execute_and_collect方法将收集数据到客户端内存 将结果发送到DataStream sink conne

    2024年02月11日
    浏览(32)
  • Flink流批一体计算(12):PyFlink Tabel API之构建作业

    目录 1.创建源表和结果表。 创建及注册表名分别为 source 和 sink 的表 使用 TableEnvironment.execute_sql() 方法,通过 DDL 语句来注册源表和结果表 2. 创建一个作业 3. 提交作业Submitting PyFlink Jobs 1.创建源表和结果表。 创建及注册表名分别为 source 和 sink 的表 其中,源表 source 有一列

    2024年02月13日
    浏览(38)
  • Flink流批一体计算(19):PyFlink DataStream API之State

    目录 keyed state Keyed DataStream 使用 Keyed State 实现了一个简单的计数窗口 状态有效期 (TTL) 过期数据的清理 全量快照时进行清理 增量数据清理 在 RocksDB 压缩时清理 Operator State算子状态 Broadcast State广播状态 keyed state Keyed DataStream 使用 keyed state,首先需要为DataStream指定 key(主键)

    2024年02月10日
    浏览(28)
  • Flink流批一体计算(11):PyFlink Tabel API之TableEnvironment

    目录 概述 设置重启策略 什么是flink的重启策略(Restartstrategy) flink的重启策略(Restartstrategy)实战 flink的4种重启策略 FixedDelayRestartstrategy(固定延时重启策略) FailureRateRestartstrategy(故障率重启策略) NoRestartstrategy(不重启策略) 配置State Backends 以及 Checkpointing Checkpoint 启用和配置

    2024年02月13日
    浏览(32)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包