FlinkOnYarn 监控 flink任务

这篇具有很好参考价值的文章主要介绍了FlinkOnYarn 监控 flink任务。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Flink任务一般为实时不断运行的任务,如果没有任务监控,
任务异常时无法第一时间处理会比较麻烦。
这里通过调用API接口方式来获取参数,实现任务监控。

Flink任务监控(基于API接口编写shell脚本)
一 flink-on-yarn 模式
二 编写shell 脚本 

监控集群指标

http://rm-http-address:port/ws/v1/cluster/metrics

 响应正文

<clusterMetrics>

<appsSubmitted>**</appsSubmitted>

<appsCompleted>**</appsCompleted>

<appsPending>0</appsPending>

<appsRunning>**</appsRunning>

<appsFailed>**</appsFailed>

<appsKilled>**</appsKilled>

<reservedMB>0</reservedMB>

<availableMB>**</availableMB>

<allocatedMB>**</allocatedMB>

<pendingMB>0</pendingMB>

<reservedVirtualCores>0</reservedVirtualCores>

<availableVirtualCores>**</availableVirtualCores>

<allocatedVirtualCores>**</allocatedVirtualCores>

<pendingVirtualCores>0</pendingVirtualCores>

<containersAllocated>**</containersAllocated>

<containersReserved>0</containersReserved>

<containersPending>0</containersPending>

<totalMB>**</totalMB>

<totalVirtualCores>**</totalVirtualCores>

<utilizedMBPercent>53</utilizedMBPercent>

<utilizedVirtualCoresPercent>**</utilizedVirtualCoresPercent>

<rmSchedulerBusyPercent>0</rmSchedulerBusyPercent>

<totalNodes>**</totalNodes>

<lostNodes>0</lostNodes>

<unhealthyNodes>**</unhealthyNodes>

<decommissioningNodes>0</decommissioningNodes>

<decommissionedNodes>0</decommissionedNodes>

<rebootedNodes>0</rebootedNodes>

<activeNodes>**</activeNodes>

<shutdownNodes>0</shutdownNodes>

<totalAllocatedContainersAcrossPartition>0</totalAllocatedContainersAcrossPartition>

<crossPartitionMetricsAvailable>false</crossPartitionMetricsAvailable>

</clusterMetrics>

clusterMetrics 对象的元素

项目 数据类型 描述
apps已提交 int 提交的申请数量
应用已完成 int 完成的申请数量
apps待定 int 待处理的申请数量
应用程序正在运行 int 正在运行的应用程序数
apps失败 int 失败的应用程序数
应用已杀死 int 被终止的应用程序数
保留MB 预留的内存量(以 MB 为单位)
可用MB 可用内存量(以 MB 为单位)
已分配MB 分配的内存量(以 MB 为单位)
总MB 总内存量(以 MB 为单位)
保留虚拟核心 保留的虚拟核心数
可用虚拟核心 可用虚拟核心数
分配的虚拟核心 分配的虚拟核心数
totalVirtualCores 虚拟核心数 虚拟核心总数
容器已分配 int 分配的容器数
容器保留 int 保留的容器数
容器挂起 int 待处理的容器数
总节点数 int 节点总数
活动节点 int 活动节点数
丢失节点 int 丢失的节点数
不健康的节点 int 不正常的节点数
停用节点 int 停用的节点数
已停用节点 int 停用的节点数
rebooted节点 int 重新启动的节点数
shutdown节点 int 关闭的节点数

获取所有application

curl -s http://XXX:8088/ws/v1/cluster/apps

获取 state值为 RUNNING 的application任务

curl -s http://XXX:8088/ws/v1/cluster/apps?state=RUNNING 

获取这个任务单个信息 

curl -s http://XXX:8088/ws/v1/cluster/apps/application_1619074605427_0063 |jq .app.state

请注意,根据安全设置,用户可能无法看到所有字段。 

项目 数据类型 描述
编号 字符串 应用程序 ID
用户 字符串 启动应用程序的用户
名字 字符串 应用程序名称
队列 字符串 提交应用程序的队列
字符串 根据 ResourceManager 的应用程序状态 - 有效值是 YarnApplicationState 枚举的成员:NEW、NEW_SAVING、SUBMITTED、ACCEPTED、RUNNING、FINISHED、FAILED、KILLED
finalStatus 字符串 应用程序的最终状态(如果已完成)(由应用程序本身报告)有效值是 FinalApplicationStatus 枚举的成员:UNDEFINED、SUCCEEDED、FAILED、KILLED
进展 以百分比表示的申请进度
trackingUI 字符串 跟踪 URL 当前指向的位置 - 历史记录(用于历史记录服务器)或 ApplicationMaster
trackingUrl 字符串 可用于跟踪应用程序的 Web URL
诊断 字符串 详细的诊断信息
clusterId 集群 ID
应用程序类型 字符串 应用程序类型
application标签 字符串 应用程序的逗号分隔标记
优先权 字符串 所提交申请的优先权
开始时间 应用程序启动的时间(自纪元以来的毫秒)
完成时间 应用程序完成的时间(以纪元以来的毫秒数为单位)
elapsedTime 自应用程序启动以来经过的时间(以毫秒为单位)
amContainer日志 字符串 应用程序主容器日志的 URL
amHostHttp地址 字符串 应用程序主机的节点 http 地址
amRPCAddress 字符串 应用程序主机的 RPC 地址
已分配MB int 分配给应用程序正在运行的容器的内存总和(以 MB 为单位)
已分配VCores int 分配给应用程序正在运行的容器的虚拟核心的总和
running容器 int 当前为应用程序运行的容器数
memorySeconds 应用程序分配的内存量(兆字节-秒)
vcore秒数 应用程序分配的 CPU 资源量(虚拟内核 - 秒)
queueUsagePercentage 应用正在使用的队列资源的百分比
clusterUsage百分比 应用正在使用的群集资源的百分比。
抢占ResourceMB 抢占式容器使用的内存
preemptedResourceVCores 抢占容器使用的虚拟核心数
numNonAMContainer抢占 int 抢占的标准容器数
numAMContainer抢占 int 抢占的应用程序主容器数
logAggregationStatus 字符串 日志聚合的状态 - 有效值是 LogAggregationStatus 枚举的成员:DISABLED、NOT_START、RUNNING、RUNNING_WITH_FAILURE、SUCCEEDED、FAILED、TIME_OUT
unmanaged应用程序 布尔 应用程序是否处于非托管状态。
appNodeLabelExpression 字符串 节点标签表达式,用于标识默认情况下应在其上运行应用程序容器的节点。
amNodeLabel表达式 字符串 节点标签表达式,用于标识应用程序的 AM 容器预期在其上运行的节点。

jq,是linux一个很方便的json处理工具

通俗的说就是一个能够接受json,处理json,输出json的程序,反正很好用。

安装起来也非常的方便,直接使用yum即可安装。linux下离线安装jq工具 - 代码天地 (codetd.com)

yum install jq

编写shell脚本

由于公司离线yarn和实时yarn 采用是分开的方式。
只需要监控实时yarn 任务有没有处于RUNNING,达到监控的目的
这里shell脚本也只记录,flink-on-yarn 这种部署方式任务监控
shell脚本水平有限,大家多多谅解,欢迎指导

shell脚本实现功能:
获取线运行job任务,记录到日志文件。下一次脚本调用时候读取日志文件,判断状态。
不是RUNNING,就告警同时重新记录日志。

#!/bin/bash

Joblist=`cat /opt/shell/logs/flink_job.log`    #获取记录job的log文件
let i=0  #获取任务数
let log_count=0  #获取日志中的任务数
start_count=RUNNING  #判断任务是否存在异常

############## 1 判断日志文件内容是否为空,为空时自动读取flink任务并记录到日志文件 #########
if [ -z "$Joblist" ]
then
	while :
	do
		job_id[$i]=`curl -s http://XXX:8088/ws/v1/cluster/apps?state=RUNNING |jq .apps.app[$i].id`

		if [ ${job_id[$i]} = "null" ];then
			break
		else
			echo ${job_id[$i]}
			echo ${job_id[$i]}>>/opt/shell/logs/flink_job.log
			let i++
		fi
	done
fi


############## 2 读取文件中JOB任务 ##################

let i=0
while read line
do
	JOB[$i]=$line
	let i++
done</opt/shell/logs/flink_job.log

log_count=$i #获取日志中的任务数


########### 3  判断任务状态,是否为RUNNIG,不是则邮件告警   ###############
for ((j=0;j<i;j++))
do
	JOB_ID=${JOB[$j]//\"}
	JOB_status=`curl -s http://XXXX:8088/ws/v1/cluster/apps/$JOB_ID  | jq .app.state`
	JOB_NAME=`curl -s http://XXX:8088/ws/v1/cluster/apps/$JOB_ID  | jq .app.name`
	START=$[`curl -s http://XXX:8088/ws/v1/cluster/apps/$JOB_ID | jq  .app.startedTime` / 1000]

#	echo "JOB_NAME: "$JOB_NAME
#	echo 启动时间: `date -d @$START +"%F %H:%M:%S"`
#	echo "JOB_status: " ${JOB_status//\"}

#echo -e "【$JOB_NAME】 \n JOB_ID: $JOB_ID \n 启动时间: `date -d @$START +"%F %H:%M:%S"` \n 检查时间: `date "+%Y-%m-%d %H:%M:%S"` \n 目前状态: $JOB_status"
#echo "=============================================="

	if [ ${JOB_status//\"} != "RUNNING" ];then
		SUBJECT="【异常告警】Flink任务异常"
		TEXT="Flink任务 【$JOB_NAME】 异常故障 \n\nJOB_ID: $JOB_ID\n\n启动时间: `date -d @$START +"%F %H:%M:%S"` \n\n检查时间: `date "+%Y-%m-%d %H:%M:%S"`  \n\n目前状态: $JOB_status"
		echo -e $TEXT | mail -s $SUBJECT     邮箱地址
		start_count=erron
	fi
done


########### 4  出现任务异常,重新读取job 任务记录到日志文件   ###############

let i=0
if [ $start_count == "erron" ];then


echo '重新写入日志文件'
	while :
	do
		job_id[$i]=`curl -s http://XXXX:8088/ws/v1/cluster/apps?state=RUNNING |jq .apps.app[$i].id`

		if [ ${job_id[$i]} = "null" ];then
			break
		elif  [ $i == 0 ]; then
			echo ${job_id[$i]}>/opt/shell/logs/flink_job.log

		else
			echo ${job_id[$i]}>>/opt/shell/logs/flink_job.log
		fi
		let i++
	done
	start_count=RUNNING
fi

########### 5  判断线上任务数是否一致,是否有新任务增加   ###############



let i=0
while :
do
	job_id[$i]=`curl -s http://XXX:8088/ws/v1/cluster/apps?state=RUNNING |jq .apps.app[$i].id`

	if [ ${job_id[$i]} = "null" ];then
		break
	else

		let i++
	fi
done
let count=$i #线上任务数
echo "==========================线上最新RUNNING状态任务数: "$count
echo "==========================日志RUNNING状态任务数: "$log_count



if [ ! $count -eq $log_count ]; then
	echo "现有RUNNING状态任务数不相等于已记录的任务数"
	echo  ${job_id[0]} >/opt/shell/logs/flink_job.log
	for ((i=1;i<count;i++))
	do
		echo "重新写入JOB: "${job_id[$i]}
		echo ${job_id[$i]}>> /opt/shell/logs/flink_job.log

	done

fi

echo "======================当前时间: `date "+%Y-%m-%d %H:%M:%S"`======================================="
echo  ================================================================================================
echo  =====================================本次crontab监控结束========================================
echo  ================================================================================================

Yarn REST API 使用指南-阿里云开发者社区

Apache Hadoop 3.0.1 – ResourceManager REST API。文章来源地址https://www.toymoban.com/news/detail-783418.html

到了这里,关于FlinkOnYarn 监控 flink任务的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 4.1、Flink任务怎样读取集合中的数据

    非并行数据源:         def fromElements [T: TypeInformation](data: T*): DataStream[T]         def fromCollection [T: TypeInformation](data: Seq[T]): DataStream[T]          def fromCollection [T: TypeInformation] (data: Iterator[T]): DataStream[T]  并行数据源:         def fromParallelCollection [T: TypeInformation] (dat

    2024年02月13日
    浏览(47)
  • 4.2、Flink任务怎样读取文件中的数据

    目录 1、前言 2、readTextFile(已过时,不推荐使用) 3、readFile(已过时,不推荐使用) 4、fromSource(FileSource) 推荐使用 思考: 读取文件时可以设置哪些规则呢?          1. 文件的格式(txt、csv、二进制...)                  2. 文件的分隔符(按n 分割)          3. 是否需

    2024年02月13日
    浏览(40)
  • 4.3、Flink任务怎样读取Kafka中的数据

    目录 1、添加pom依赖 2、API使用说明 3、这是一个完整的入门案例 4、Kafka消息应该如何解析 4.1、只获取Kafka消息的value部分 ​4.2、获取完整Kafka消息(key、value、Metadata) 4.3、自定义Kafka消息解析器 5、起始消费位点应该如何设置 ​5.1、earliest() 5.2、latest() 5.3、timestamp() 6、Kafka分区

    2024年02月13日
    浏览(41)
  • Idea本地跑flink任务时,总是重复消费kafka的数据(kafka->mysql)

    1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 Idea中执行任务时,没法看到JobManager的错误,以至于我以为是什么特殊的原因导致任务总是反复消费。在close方法中,增加日志,发现jdbc连接被关闭了。 重新消费,jdbc连接又启动了。 注意,在Flink的函数中,open和close方法

    2024年02月07日
    浏览(39)
  • Flink CEP完全指南:捕获数据的灵魂,构建智慧监控与实时分析大师级工具

    Flink CEP(Complex Event Processing)是 Apache Flink 的一个库,用于实现复杂的事件流处理和模式匹配。它可以用来识别事件流中的复杂模式和序列,这对于需要在实时数据流中进行模式识别的应用场景非常有用,比如监控、异常检测、业务流程管理等。 在Flink CEP中,你可以定义复杂

    2024年02月03日
    浏览(58)
  • ApacheStreamPark2.1.0部署及执行flink-cdc任务同步mysql表的数据到es的实践

    ApacheStreamPark是流处理极速开发框架,流批一体 湖仓一体的云原生平台,一站式流处理计算平台。   特性中的简单易用和文档详尽这两点我也是深有体会的,部署一点都不简单,照着官方文档都不一定能搞出来,下面部署环节慢慢来吐槽吧。   之前我们写 Flink SQL 基本上

    2024年02月11日
    浏览(48)
  • flink任务启动抛出mysql数据库连接过多异常message from server:“Too many connections“解决办法

    1. 异常现象 2. 现象分析 2.1 mysql数据库最大默认连接数是151 2.2 已用连接数超过了最大连接数导致的异常 3. 解决办法 3.1 修改mysql最大连接数配置文件 命令行修改最大连接数(max_connections),设置最大连接数为1000。 mysql set global max_connections = 1000; 这种方式有个问题,就是设置的最

    2024年02月14日
    浏览(54)
  • Flink CDC-Oracle CDC配置及DataStream API实现代码...可实现监控采集一个数据库的多个表

    使用sysdba角色登录到Oracle数据库 确保Oracle归档日志(Archive Log)已启用 若未启用归档日志, 需运行以下命令启用归档日志 设置归档日志存储大小及位置 设置数据库恢复文件存储区域的大小(如归档重做日志文件、控制文件备份等) 设置恢复文件的实际物理存储路径;scope=spfile参数

    2024年02月05日
    浏览(51)
  • 【Flink】详解Flink任务提交流程

    通常我们会使用 bin/flink run -t yarn-per-job -c com.xxx.xxx.WordCount/WordCount.jar 方式启动任务;我们看一下 flink 文件中到底做了什么,以下是其部分源码 可以看到,第一步将相对地址转换成绝对地址;第二步获取 Flink 配置信息,这个信息放在 bin 目录下的 config. sh 中;第三步获取 JV

    2024年02月14日
    浏览(45)
  • 【Flink】Flink任务缺失Jobmanager日志的问题排查

    问题不是大问题,不是什么代码级别的高深问题,也没有影响任务运行,纯粹因为人员粗心导致,记录一下排查的过程。 一个生产环境的奇怪问题,环境是flink1.15.0 on yarn3.2.2的,研发人员反馈业务正常运行,但是最近变更算法替换新包的时候有业务异常,然后需要排查日志的

    2024年01月19日
    浏览(45)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包