Flink任务一般为实时不断运行的任务,如果没有任务监控,
任务异常时无法第一时间处理会比较麻烦。
这里通过调用API接口方式来获取参数,实现任务监控。
Flink任务监控(基于API接口编写shell脚本)
一 flink-on-yarn 模式
二 编写shell 脚本
监控集群指标
http://rm-http-address:port/ws/v1/cluster/metrics
响应正文
<clusterMetrics>
<appsSubmitted>**</appsSubmitted>
<appsCompleted>**</appsCompleted>
<appsPending>0</appsPending>
<appsRunning>**</appsRunning>
<appsFailed>**</appsFailed>
<appsKilled>**</appsKilled>
<reservedMB>0</reservedMB>
<availableMB>**</availableMB>
<allocatedMB>**</allocatedMB>
<pendingMB>0</pendingMB>
<reservedVirtualCores>0</reservedVirtualCores>
<availableVirtualCores>**</availableVirtualCores>
<allocatedVirtualCores>**</allocatedVirtualCores>
<pendingVirtualCores>0</pendingVirtualCores>
<containersAllocated>**</containersAllocated>
<containersReserved>0</containersReserved>
<containersPending>0</containersPending>
<totalMB>**</totalMB>
<totalVirtualCores>**</totalVirtualCores>
<utilizedMBPercent>53</utilizedMBPercent>
<utilizedVirtualCoresPercent>**</utilizedVirtualCoresPercent>
<rmSchedulerBusyPercent>0</rmSchedulerBusyPercent>
<totalNodes>**</totalNodes>
<lostNodes>0</lostNodes>
<unhealthyNodes>**</unhealthyNodes>
<decommissioningNodes>0</decommissioningNodes>
<decommissionedNodes>0</decommissionedNodes>
<rebootedNodes>0</rebootedNodes>
<activeNodes>**</activeNodes>
<shutdownNodes>0</shutdownNodes>
<totalAllocatedContainersAcrossPartition>0</totalAllocatedContainersAcrossPartition>
<crossPartitionMetricsAvailable>false</crossPartitionMetricsAvailable>
</clusterMetrics>
clusterMetrics 对象的元素
项目 | 数据类型 | 描述 |
---|---|---|
apps已提交 | int | 提交的申请数量 |
应用已完成 | int | 完成的申请数量 |
apps待定 | int | 待处理的申请数量 |
应用程序正在运行 | int | 正在运行的应用程序数 |
apps失败 | int | 失败的应用程序数 |
应用已杀死 | int | 被终止的应用程序数 |
保留MB | 长 | 预留的内存量(以 MB 为单位) |
可用MB | 长 | 可用内存量(以 MB 为单位) |
已分配MB | 长 | 分配的内存量(以 MB 为单位) |
总MB | 长 | 总内存量(以 MB 为单位) |
保留虚拟核心 | 长 | 保留的虚拟核心数 |
可用虚拟核心 | 长 | 可用虚拟核心数 |
分配的虚拟核心 | 长 | 分配的虚拟核心数 |
totalVirtualCores 虚拟核心数 | 长 | 虚拟核心总数 |
容器已分配 | int | 分配的容器数 |
容器保留 | int | 保留的容器数 |
容器挂起 | int | 待处理的容器数 |
总节点数 | int | 节点总数 |
活动节点 | int | 活动节点数 |
丢失节点 | int | 丢失的节点数 |
不健康的节点 | int | 不正常的节点数 |
停用节点 | int | 停用的节点数 |
已停用节点 | int | 停用的节点数 |
rebooted节点 | int | 重新启动的节点数 |
shutdown节点 | int | 关闭的节点数 |
获取所有application
curl -s http://XXX:8088/ws/v1/cluster/apps
获取 state值为 RUNNING 的application任务
curl -s http://XXX:8088/ws/v1/cluster/apps?state=RUNNING
获取这个任务单个信息
curl -s http://XXX:8088/ws/v1/cluster/apps/application_1619074605427_0063 |jq .app.state
请注意,根据安全设置,用户可能无法看到所有字段。
项目 数据类型 描述 编号 字符串 应用程序 ID 用户 字符串 启动应用程序的用户 名字 字符串 应用程序名称 队列 字符串 提交应用程序的队列 州 字符串 根据 ResourceManager 的应用程序状态 - 有效值是 YarnApplicationState 枚举的成员:NEW、NEW_SAVING、SUBMITTED、ACCEPTED、RUNNING、FINISHED、FAILED、KILLED finalStatus 字符串 应用程序的最终状态(如果已完成)(由应用程序本身报告)有效值是 FinalApplicationStatus 枚举的成员:UNDEFINED、SUCCEEDED、FAILED、KILLED 进展 浮 以百分比表示的申请进度 trackingUI 字符串 跟踪 URL 当前指向的位置 - 历史记录(用于历史记录服务器)或 ApplicationMaster trackingUrl 字符串 可用于跟踪应用程序的 Web URL 诊断 字符串 详细的诊断信息 clusterId 长 集群 ID 应用程序类型 字符串 应用程序类型 application标签 字符串 应用程序的逗号分隔标记 优先权 字符串 所提交申请的优先权 开始时间 长 应用程序启动的时间(自纪元以来的毫秒) 完成时间 长 应用程序完成的时间(以纪元以来的毫秒数为单位) elapsedTime 长 自应用程序启动以来经过的时间(以毫秒为单位) amContainer日志 字符串 应用程序主容器日志的 URL amHostHttp地址 字符串 应用程序主机的节点 http 地址 amRPCAddress 字符串 应用程序主机的 RPC 地址 已分配MB int 分配给应用程序正在运行的容器的内存总和(以 MB 为单位) 已分配VCores int 分配给应用程序正在运行的容器的虚拟核心的总和 running容器 int 当前为应用程序运行的容器数 memorySeconds 长 应用程序分配的内存量(兆字节-秒) vcore秒数 长 应用程序分配的 CPU 资源量(虚拟内核 - 秒) queueUsagePercentage 浮 应用正在使用的队列资源的百分比 clusterUsage百分比 浮 应用正在使用的群集资源的百分比。 抢占ResourceMB 长 抢占式容器使用的内存 preemptedResourceVCores 长 抢占容器使用的虚拟核心数 numNonAMContainer抢占 int 抢占的标准容器数 numAMContainer抢占 int 抢占的应用程序主容器数 logAggregationStatus 字符串 日志聚合的状态 - 有效值是 LogAggregationStatus 枚举的成员:DISABLED、NOT_START、RUNNING、RUNNING_WITH_FAILURE、SUCCEEDED、FAILED、TIME_OUT unmanaged应用程序 布尔 应用程序是否处于非托管状态。 appNodeLabelExpression 字符串 节点标签表达式,用于标识默认情况下应在其上运行应用程序容器的节点。 amNodeLabel表达式 字符串 节点标签表达式,用于标识应用程序的 AM 容器预期在其上运行的节点。
jq,是linux一个很方便的json处理工具
通俗的说就是一个能够接受json,处理json,输出json的程序,反正很好用。
安装起来也非常的方便,直接使用yum即可安装。linux下离线安装jq工具 - 代码天地 (codetd.com)
yum install jq
编写shell脚本
由于公司离线yarn和实时yarn 采用是分开的方式。
只需要监控实时yarn 任务有没有处于RUNNING,达到监控的目的
这里shell脚本也只记录,flink-on-yarn 这种部署方式任务监控
shell脚本水平有限,大家多多谅解,欢迎指导shell脚本实现功能:
获取线运行job任务,记录到日志文件。下一次脚本调用时候读取日志文件,判断状态。
不是RUNNING,就告警同时重新记录日志。
#!/bin/bash
Joblist=`cat /opt/shell/logs/flink_job.log` #获取记录job的log文件
let i=0 #获取任务数
let log_count=0 #获取日志中的任务数
start_count=RUNNING #判断任务是否存在异常
############## 1 判断日志文件内容是否为空,为空时自动读取flink任务并记录到日志文件 #########
if [ -z "$Joblist" ]
then
while :
do
job_id[$i]=`curl -s http://XXX:8088/ws/v1/cluster/apps?state=RUNNING |jq .apps.app[$i].id`
if [ ${job_id[$i]} = "null" ];then
break
else
echo ${job_id[$i]}
echo ${job_id[$i]}>>/opt/shell/logs/flink_job.log
let i++
fi
done
fi
############## 2 读取文件中JOB任务 ##################
let i=0
while read line
do
JOB[$i]=$line
let i++
done</opt/shell/logs/flink_job.log
log_count=$i #获取日志中的任务数
########### 3 判断任务状态,是否为RUNNIG,不是则邮件告警 ###############
for ((j=0;j<i;j++))
do
JOB_ID=${JOB[$j]//\"}
JOB_status=`curl -s http://XXXX:8088/ws/v1/cluster/apps/$JOB_ID | jq .app.state`
JOB_NAME=`curl -s http://XXX:8088/ws/v1/cluster/apps/$JOB_ID | jq .app.name`
START=$[`curl -s http://XXX:8088/ws/v1/cluster/apps/$JOB_ID | jq .app.startedTime` / 1000]
# echo "JOB_NAME: "$JOB_NAME
# echo 启动时间: `date -d @$START +"%F %H:%M:%S"`
# echo "JOB_status: " ${JOB_status//\"}
#echo -e "【$JOB_NAME】 \n JOB_ID: $JOB_ID \n 启动时间: `date -d @$START +"%F %H:%M:%S"` \n 检查时间: `date "+%Y-%m-%d %H:%M:%S"` \n 目前状态: $JOB_status"
#echo "=============================================="
if [ ${JOB_status//\"} != "RUNNING" ];then
SUBJECT="【异常告警】Flink任务异常"
TEXT="Flink任务 【$JOB_NAME】 异常故障 \n\nJOB_ID: $JOB_ID\n\n启动时间: `date -d @$START +"%F %H:%M:%S"` \n\n检查时间: `date "+%Y-%m-%d %H:%M:%S"` \n\n目前状态: $JOB_status"
echo -e $TEXT | mail -s $SUBJECT 邮箱地址
start_count=erron
fi
done
########### 4 出现任务异常,重新读取job 任务记录到日志文件 ###############
let i=0
if [ $start_count == "erron" ];then
echo '重新写入日志文件'
while :
do
job_id[$i]=`curl -s http://XXXX:8088/ws/v1/cluster/apps?state=RUNNING |jq .apps.app[$i].id`
if [ ${job_id[$i]} = "null" ];then
break
elif [ $i == 0 ]; then
echo ${job_id[$i]}>/opt/shell/logs/flink_job.log
else
echo ${job_id[$i]}>>/opt/shell/logs/flink_job.log
fi
let i++
done
start_count=RUNNING
fi
########### 5 判断线上任务数是否一致,是否有新任务增加 ###############
let i=0
while :
do
job_id[$i]=`curl -s http://XXX:8088/ws/v1/cluster/apps?state=RUNNING |jq .apps.app[$i].id`
if [ ${job_id[$i]} = "null" ];then
break
else
let i++
fi
done
let count=$i #线上任务数
echo "==========================线上最新RUNNING状态任务数: "$count
echo "==========================日志RUNNING状态任务数: "$log_count
if [ ! $count -eq $log_count ]; then
echo "现有RUNNING状态任务数不相等于已记录的任务数"
echo ${job_id[0]} >/opt/shell/logs/flink_job.log
for ((i=1;i<count;i++))
do
echo "重新写入JOB: "${job_id[$i]}
echo ${job_id[$i]}>> /opt/shell/logs/flink_job.log
done
fi
echo "======================当前时间: `date "+%Y-%m-%d %H:%M:%S"`======================================="
echo ================================================================================================
echo =====================================本次crontab监控结束========================================
echo ================================================================================================
Yarn REST API 使用指南-阿里云开发者社区文章来源:https://www.toymoban.com/news/detail-783418.html
Apache Hadoop 3.0.1 – ResourceManager REST API。文章来源地址https://www.toymoban.com/news/detail-783418.html
到了这里,关于FlinkOnYarn 监控 flink任务的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!