Jenkins 安装的插件
插件名称 | 作用 |
---|---|
Rebuilder | |
-
Rebuilder。
官方地址:https://plugins.jenkins.io/rebuild
安装方式:在Jenkins插件当中直接搜索即可安装。
功能说明:此插件可以直接重复上次构建,也可以用于查看一些参数比较复杂的构建时,上次构建所选的参数是什么。非常nice的一个插件。 -
AnsiColor。
官方地址:https://plugins.jenkins.io/ansicolor
安装方式:在Jenkins插件当中直接搜索即可安装。
功能说明:扩展支持我们在shell当中定义的echo -e指令,从而给一定的输出上颜色。
使用方式:点此跳转到使用介绍。(opens new window) -
Maven Release Plug-in。
maven风格插件。
安装方式:在Jenkins插件当中直接搜索即可安装。 -
user build vars。
官方地址:https://wiki.jenkins.io/display/JENKINS/Build+User+Vars+Plugin
安装方式:在Jenkins插件当中直接搜索即可安装。
功能说明:通过此插件,让整个Jenkins系统中的用户参数成为一个可调用的变量。
使用方式:在构建环境中选中Set Jenkins user build variables。 -
Post build task
功能说明:此功能允许您关联 shell 或批处理脚本,这些脚本根据构建日志输出在 Hudson
上执行某些任务。如果日志文本与构建日志文件中的某处匹配,脚本将执行。例如:如果在日志文本中指定了“IOException”,并且构建日志文件包含字符串“IOException”,则脚本将执行。
允许使用 Java 正则表达式,并且可以将组用作脚本参数。如果文本是“Last Build : #(\d+)”并且脚本是“script.sh”,那么如果日志包含一行“Last
Build : #4”,脚本“script.sh 4”将被调用. -
MultiJob Phase
功能说明:上下游执行
发送消息到飞书
预览 1 (单Job)
-
示例图
-
对应shell
#!/usr/bin/env bash
url1="https://open.feishu.cn/open-apis/bot/v2/hook/"
url2="https://open.feishu.cn/open-apis/bot/v2/hook/"
# 1. 消息 接收地址
webhook_list=($url1 $url2)
# ========================
# 2. 消息 参数预处理
# ========================
# 3. 消息 执行发送
run_send_msg() {
echo -e "\n 复制发送消息脚本 $HOME -> ${WORKSPACE}" && cp "$HOME"/send_msg_to_feishu.py "${WORKSPACE}"
for ((i = 0; i < ${#webhook_list[@]}; i++)); do
webhook=${webhook_list[$i]}
echo -e "发送中 --> $webhook"
python3 send_msg_to_feishu.py "${webhook}" -job_url "${JOB_URL}"
done
echo -e "发送完成 \n\n"
}
run_send_msg
- 对应python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import json
import subprocess
import time
import datetime
import requests
parser = argparse.ArgumentParser(description='Jenkins发送消息到飞书')
parser.add_argument('webhook', help='机器人webhookURL')
parser.add_argument('-job_url', '--JOB_URL', help='作业URL', required=True)
args = parser.parse_args()
WEBHOOK = args.webhook
JOB_URL = args.JOB_URL
BUILD_URL = JOB_URL + '/lastBuild'
def run_curl():
# 使用requests模块代替subprocess执行curl命令
try:
response = requests.get(JOB_URL + "api/json")
return response
except requests.exceptions.RequestException:
print(f"Error accessing {JOB_URL}")
return None
def check_network():
""" 检查网络 """
try:
response = requests.get(WEBHOOK)
return response.ok
except requests.exceptions.RequestException:
return False
def sending_alarms(text=None):
""" 发送信息 告警 """
network_status = subprocess.getoutput("networkctl status")
network_route = subprocess.getoutput("ip route show")
local_network_info = network_route
if text is None:
text = f"网络异常: 无法访问\n{BUILD_URL}\n{local_network_info}"
payload_message = {"msg_type": "text", "content": {"text": f"告警信息\n{text}"}}
headers = {"Content-Type": "application/json"}
res = requests.post(url=WEBHOOK, data=json.dumps(payload_message), headers=headers)
print(f"\t告警信息发送状态:{res.text}")
def get_build_info():
# 解析Jenkins构建信息
# ============ 数据获取 ============
result = requests.get(f'{BUILD_URL}/api/json')
# print(result.request)
try:
result = result.json()
# ============ 数据获取 ============
for index in result['actions']: # 节点:执行
if index: # 节点:执行 清洗节点 剔除空字段
# print(index)
if index['_class'] == 'hudson.model.CauseAction': # 查找节点 启动信息
shortDescription = index['causes'][0]['shortDescription'] # 启动着
print('\tshortDescription --> ', shortDescription)
if index['_class'] == 'jenkins.metrics.impl.TimeInQueueAction': # 查找节点 构建耗时
buildingDurationMillis = index['buildingDurationMillis'] # 时间毫秒
print('\tbuildingDurationMillis --> ', buildingDurationMillis)
if index['_class'] == 'hudson.plugins.git.util.BuildData': # 查找节点 git
for key, value in index['buildsByBranchName'].items(): # 设置变量 GIT_BRANCH
GIT_BRANCH = key
print('\tGIT_BRANCH --> ', GIT_BRANCH)
# print()
except (IndexError, KeyError, ValueError):
sending_alarms('jenkins 获取数据异常')
exit(2)
if result['duration'] != 0: # 获取字段
duration = int(result['duration']) // 1000 # 编译持续时间
minutes, seconds = divmod(duration, 60)
hours, minutes = divmod(minutes, 60)
build_duration = f'{hours}h {minutes}m {seconds}s'
print(f"\t--> 通过响应值换算{build_duration}\n")
else: # 通过当前时间计算
build_duration = datetime.datetime.now() - datetime.datetime.fromtimestamp(result['timestamp'] / 1000)
build_duration = str(build_duration).split('.')[0]
print(f"\t--> 通过当前时间计算耗时{build_duration}\n")
fullDisplayName = result['fullDisplayName']
BUILD_STATUS = result['result'] # 编译状态
print('\tbuild_status --> ', BUILD_STATUS)
timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(result['timestamp'] / 1000)) # 编译开始时间
print(f'\ttimestamp --> {timestamp}')
JOB_URL = result['url']
BUILD_DISPLAY_NAME = result['displayName']
JOB_NAME = fullDisplayName
return shortDescription, buildingDurationMillis, GIT_BRANCH, build_duration, fullDisplayName, BUILD_STATUS, timestamp, JOB_URL, BUILD_DISPLAY_NAME, JOB_NAME
def set_msg():
shortDescription, buildingDurationMillis, GIT_BRANCH, build_duration, fullDisplayName, BUILD_STATUS, timestamp, JOB_URL, BUILD_DISPLAY_NAME, JOB_NAME = get_build_info()
# ============ 设置样式 ============
if "SUCCESS" in BUILD_STATUS: # 成功
template_color = "green"
elif "FAILURE" in BUILD_STATUS: # 失败
template_color = "red"
elif "ABORTED" in BUILD_STATUS: # 终止
template_color = "yellow"
else:
template_color = "grey"
# ============ 卡片模板 ============
card = json.dumps({
"config": {
"wide_screen_mode": True
},
"elements": [
{
"tag": "markdown",
"content": f"触发时间:{timestamp}\n"
f"分支名称:{GIT_BRANCH}\n"
f"构建编号:{BUILD_DISPLAY_NAME}\n"
f"构建状态:<font color={template_color}>{BUILD_STATUS}</font>\n"
},
{
"tag": "note",
"elements": [
{
"tag": "img",
"img_key": f"{img_icon}",
"alt": {
"tag": "plain_text",
"content": f"{JOB_URL}"
}
},
{
"tag": "plain_text",
"content": f"{shortDescription}"
}
]
},
{
"tag": "hr"
},
{
"tag": "action",
"actions": [
{
"tag": "button",
"text": {
"tag": "plain_text",
"content": "报告链接"
},
"type": "primary",
"multi_url": {
"url": f"{BUILD_URL}/allure",
"pc_url": "",
"android_url": "",
"ios_url": ""
}
}
],
"layout": "bisected"
}
],
"header": {
"template": f"{template_color}",
"title": {
"content": f"Jenkins 构建状态报告: {JOB_NAME}",
"tag": "plain_text"
}
}
})
body = json.dumps({"msg_type": "interactive", "card": card})
headers = {"Content-Type": "application/json"}
res = requests.post(url=WEBHOOK, data=body, headers=headers)
print(f'发送响应 --> {res.text}')
if __name__ == '__main__':
img_icon = 'img_v2_041b28e3-5680-48c2-9af2-497ace79333g' # img_key_id jenkinsIcon 上传到飞书后得到的key
print(f"互联网状态:{check_network()}")
if not check_network():
text = f"无法访问互联网"
print(text)
exit(2)
set_msg()
预览 2 (多Job,概览)1
Jenkins 需安装
Multijob
插件
Multijob https://plugins.jenkins.io/jenkins-multijob-plugin/
-
对应shell
#!/usr/bin/env bash echo -e "\n\n 消息处理" # ======================== # 消息发送 # ======================== # 1. 消息 接收地址 # ----------------------- group='https://open.feishu.cn/open-apis/bot/v2/hook/' webhook_list=($group) py_send='SendMsgFeishu.py' # ======================== # 2. 文件处理 # ----------------------- echo -e "\n 复制发送消息脚本 $HOME -> ${WORKSPACE}" && cp "$HOME"/$py_send "${WORKSPACE}" # ======================== # 3. 消息 执行发送 # ----------------------- run_send_msg() { for ((i = 0; i < ${#webhook_list[@]}; i++)); do webhook=${webhook_list[$i]} echo -e "发送中 --> $webhook" python3 $py_send "${webhook}" -job_url "${JOB_URL}" done echo -e "发送完成 \n\n" } run_send_msg
-
对应python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import datetime
import json
import subprocess
import time
from json import JSONDecodeError
import requests
parser = argparse.ArgumentParser(description='Jenkins 发送消息到飞书',
epilog="执行示例>>> python ${webhook} -job_url ${JOB_URL}")
parser.add_argument('webhook', help='机器人webhookURL') # 必填
parser.add_argument('-job_url', '--JOB_URL', help='作业URL', required=True, ) # 必填
webhook = parser.parse_args().webhook
JOB_URL = parser.parse_args().JOB_URL
BUILD_URL = JOB_URL + '/lastBuild'
job_name = [] # 运行名称
job_duration = [] # 运行时长
job_status = [] # 运行状态
job_url = [] # 运行结果
pass_rate = [] # 百分比显示
print('修改时间:2023-07-04 10:02:43')
print(BUILD_URL.split('/')[2])
def get_jenkins_plugin():
""" 获取信息 jenkins插件 """
url = f"http://{BUILD_URL.split('/')[2]}/pluginManager/api/json?depth=1"
print(url)
plugin_list = requests.get(url, auth=jenkins_auth).json()['plugins']
print(f"Jenkins插件安装数量 {len(plugin_list)}")
# for plugin in plugin_list:
# name = plugin['longName']
# # print(name) # 插件名称
return plugin_list
def get_base_info():
""" 获取信息 system """
device_id = subprocess.getoutput('cat /etc/ding_issue')
version_os = subprocess.getoutput('cat /etc/issue')
version_browser = subprocess.getoutput('google-chrom-stable -version')
device_sn = subprocess.getoutput('hostname').split('-')[-1]
network_status = subprocess.getoutput("networkctl status")
network_route = subprocess.getoutput("ip route show")
print("设备信息".center(20, '-'))
print("\n设备唯一标识码", device_id, "\n设备序列号", version_os)
print("\n系统版本", version_os, "\n浏览器版本", version_browser, )
print("\n本地网络信息", network_status, "\n网络优先级(值小优先)", network_route)
return device_id, version_os, version_browser, device_sn,
def sending_alarms(text=None):
""" 发送信息 告警 """
network_status = subprocess.getoutput("networkctl status")
network_route = subprocess.getoutput("ip route show")
local_network_info = network_route
if text is None:
text = f"网络异常: 无法访问\n{BUILD_URL}\n{local_network_info}"
payload_message = {"msg_type": "text", "content": {"text": f"告警信息\n{text}"}}
headers = {"Content-Type": "application/json"}
res = requests.post(url=webhook, data=json.dumps(payload_message), headers=headers)
print(f"告警信息发送状态:{res.text}")
def set_msg():
""" 发送信息 """
# ------------------
# ------ 数据获取 ------
# ------------------
JENKINS_URL = BUILD_URL.split('job')[0] # JENKINS_URL
# ------ begin 登陆异常 ------
try:
get_result = requests.get(f'{BUILD_URL}/api/json', auth=jenkins_auth)
print(f"数据查询API地址:{get_result.url}")
result = get_result.json()
# except JSONDecodeError: # json解析失败 未登陆
# text = "Error 401 Unauthorized"
# sending_alarms(text)
# quit(text)
# except RecursionError:
# quit('递归错误:从 unicode 字符串解码 JSON 对象时超出最大递归深度')
# except [OSError, TimeoutError]: # 异常列表: 网络
# text = "No route to host"
# sending_alarms()
# quit(text)
except Exception as e: # 通用异常
text = f"发生异常: {type(e).__name__} --> {str(e)}\n检查:{BUILD_URL}/api/json"
sending_alarms(text)
quit(text)
# res = requests.get(f'{BUILD_URL}/api/json', auth=jenkins_auth)
# if res.status_code == 401:
# quit('Error 401 Unauthorized')
# else:
# result = res.json()
# # ------ end 登陆异常 ------
shortDescription = result['actions'][0]['causes'][0]['shortDescription'] # 启动者
timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(result['timestamp'] / 1000)) # 编译开始时间
# ------ begin 获取持续时间 ------
if result['duration'] != 0: # 获取字段
duration = int(result['duration']) // 1000 # 编译持续时间
minutes, seconds = divmod(duration, 60)
hours, minutes = divmod(minutes, 60)
build_duration = f'{hours}h {minutes}m {seconds}s'
print(f"--> 通过响应值换算{build_duration}\n")
else: # 通过当前时间计算
build_duration = datetime.datetime.now() - datetime.datetime.fromtimestamp(result['timestamp'] / 1000)
build_duration = str(build_duration).split('.')[0]
print(f"--> 通过当前时间计算耗时{build_duration}\n")
# ------ end 获取持续时间 ------
total_count = len(result['subBuilds']) # 数量总计
print(f'======= 项目概览 ======= \n'
f'shortDescription:{shortDescription}\nbuild duration:{build_duration}\ntotal_count:{total_count}\ntimestamp:{timestamp}\n')
print('提示: 没有allure就报错 无法运行 JSONDecodeError')
for index in result['subBuilds']: # 提取数据
# print(index)
# print(index['result'])
# 数据预处理
allure_summary = requests.get(f"{JENKINS_URL}{index['url']}/allure/widgets/summary.json")
allure_history_trend = requests.get(f"{JENKINS_URL}{index['url']}/allure/widgets/history-trend.json")
# print(allure_history_trend.request.url)
try: # 解析allure.json ## JAVA_HOME 导致allure报告生成失败
allure_summary = allure_summary.json()['statistic']
try: # 获取历史数据
allure_history_trend = allure_history_trend.json()[1]
except IndexError:
print('没有历史数据')
allure_history_trend = allure_history_trend.json()[0]
# 计算百分比
if allure_summary['total'] != 0: # 除数不能为0
allure_pass_rate = allure_summary['passed'] / allure_summary['total']
else:
allure_pass_rate = 0 # 除数不能为0
if allure_history_trend['data']['total'] != 0:
allure_history = allure_history_trend['data']['passed'] / allure_history_trend['data']['total']
else:
allure_history = 0
except Exception as e:
text = f"解析Allure数据异常,请检查\n发生异常: {type(e).__name__} --> {str(e)}\n检查:{BUILD_URL}/api/json"
sending_alarms(text)
allure_pass_rate = allure_history = 0
allure_summary = {"total": 0, "passed": 0, }
# ------------------
# ------ 设置样式 ------
# ------------------
if "SUCCESS" == index['result']: # 成功
color = "green"
elif "FAILURE" == index['result']: # 失败
color = "red"
elif "ABORTED" == index['result']: # 中止
color = "yellow"
else: # 其他
color = "grey"
# 通过率对比
allure_change = allure_pass_rate - allure_history
print(f"{index['jobName']} --> 本次比上次通过率 {allure_change}")
if allure_pass_rate > allure_history:
allure_pass_rate = f'<font color=green>↑{allure_pass_rate:.2%}</font>'
elif allure_pass_rate < allure_history or allure_pass_rate == 0:
allure_pass_rate = f'<font color=red>↓{allure_pass_rate:.2%}</font>'
else:
allure_pass_rate = f' {allure_pass_rate:.2%}'
# ------------------
# ------ 载入数据 ------
# ------------------
job_name.append({"tag": "markdown", "content": f"{index['jobName']}", "text_align": "center"})
job_duration.append({"tag": "markdown", "content": f"{index['duration']}", "text_align": "center"})
job_status.append(
{"tag": "markdown", "content": f"<font color={color}>{index['result']}</font>", "text_align": "center"})
job_url.append(
{"tag": "markdown", "content": f"[查看]({JENKINS_URL}{index['url']}/allure)", "text_align": "center"})
pass_rate.append(
{"tag": "markdown",
"content": f"{allure_summary['passed']}/{allure_summary['total']}{allure_pass_rate} ",
"text_align": "center"})
print(f'======= 项目详情 ======= \n{job_name}\n{job_duration}\n{job_status}\n{pass_rate}\n{job_url}\n')
# ------------------
# ------ 卡片模板 ------
# ------------------
card = json.dumps({
"elements": [
{
"tag": "markdown",
"content": "**项目总览**\n"
},
{
"tag": "column_set",
"flex_mode": "bisect",
"background_style": "grey",
"horizontal_spacing": "default",
"columns": [
{
"tag": "column",
"width": "weighted",
"weight": 1,
"elements": [
{
"tag": "markdown",
"text_align": "center",
"content": f"项目数量\n**{total_count}**\n"
}
]
},
{
"tag": "column",
"width": "weighted",
"weight": 1,
"elements": [
{
"tag": "markdown",
"text_align": "center",
"content": f"运行耗时\n**{build_duration}**"
}
]
},
{
"tag": "column",
"width": "weighted",
"weight": 1,
"elements": [
{
"tag": "markdown",
"text_align": "center",
"content": f"执行时间\n**{timestamp}**\n"
}
]
}
]
},
{
"tag": "markdown",
"content": "**项目信息**"
},
{
"tag": "column_set",
"flex_mode": "none",
"background_style": "grey",
"columns": [
{
"tag": "column",
"width": "weighted",
"weight": 2,
"vertical_align": "top",
"elements": [
{
"tag": "markdown",
"content": "**项目名**",
"text_align": "center"
}
]
},
{
"tag": "column",
"width": "weighted",
"weight": 1,
"vertical_align": "top",
"elements": [
{
"tag": "markdown",
"content": "**运行时长**",
"text_align": "center"
}
]
},
{
"tag": "column",
"width": "weighted",
"weight": 1,
"vertical_align": "top",
"elements": [
{
"tag": "markdown",
"content": "**运行状态**",
"text_align": "center"
}
]
},
{
"tag": "column",
"width": "weighted",
"weight": 1,
"vertical_align": "top",
"elements": [
{
"tag": "markdown",
"content": "**passed/total/通过率**",
"text_align": "center"
}
]
},
{
"tag": "column",
"width": "weighted",
"weight": 1,
"vertical_align": "top",
"elements": [
{
"tag": "markdown",
"content": "**Report**",
"text_align": "center"
}
]
}
]
},
{
"tag": "column_set",
"flex_mode": "none",
"background_style": "default",
"columns": [
{
"tag": "column",
"width": "weighted",
"weight": 2,
"vertical_align": "top",
"elements": job_name
},
{
"tag": "column",
"width": "weighted",
"weight": 1,
"vertical_align": "center",
"elements": job_duration
},
{
"tag": "column",
"width": "weighted",
"weight": 1,
"vertical_align": "top",
"elements": job_status
},
{
"tag": "column",
"width": "weighted",
"weight": 1,
"vertical_align": "top",
"elements": pass_rate
},
{
"tag": "column",
"width": "weighted",
"weight": 1,
"vertical_align": "top",
"elements": job_url
}
]
},
{
"tag": "hr"
},
{
"tag": "note",
"elements": [
{
"tag": "img",
"img_key": f"{img_icon}",
"alt": {
"tag": "plain_text",
"content": ""
}
},
{
"tag": "plain_text",
"content": f"{shortDescription}\n颜色代表对比上次执行,绿色上升,红色下降"
}
]
}
]
})
card_id = json.dumps({
"type": "template",
"data": {
"template_id": "ctp_AA6DZMfkJekh", # 卡片id,参数必填。可在搭建工具中通过“复制卡片ID”获取
"template_variable": # 卡片中绑定的变量的取值。如没有绑定变量,可不填此字段。
{
"total_count": "29",
"group_table": [
{
"jobName": "test001",
"duration": "小于1小时",
"build_url": "baidu.com",
"build_status": "SUCCESS",
"tmp_color": "green"
},
{
"jobName": "test002",
"duration": "2小时",
"build_url": "baidu.com",
"build_status": "FAILURE",
"tmp_color": "red"
},
{
"jobName": "test003",
"duration": "3小时",
"build_url": "baidu.com",
"build_status": "ABORTED",
"tmp_color": "yellow"
},
{
"jobName": "test004",
"duration": "3小时",
"build_url": "baidu.com",
"build_status": "UNSTABLE",
"tmp_color": "grey"
}
],
"duration": "15080",
"shortDescription": "Started by user admin",
"timestamp": "1686645721264",
"jobName": "",
"tmp_color": ""
}
}
})
body = json.dumps({"msg_type": "interactive", "card": card}) # 使用 当前模板
# body = json.dumps({"msg_type": "interactive", "card": card_id}) # 使用 预置模板
headers = {"Content-Type": "application/json"}
res = requests.post(url=webhook, data=body, headers=headers)
print(f'消息发送响应 -->\n\t {res.text}')
if __name__ == '__main__':
img_icon = 'img_v2_098e80ae-e583-4148-b822-f42a05298d3g' # img_key_id jenkinsIcon
# jenkins_auth = ('result', 'result') # jenkins User:Pwd
jenkins_auth = ('admin', '1') # jenkins User:Pwd
# get_base_info()
################################
# ## 方法一
set_msg()
################################
################################
# ## 方法二
# ## 判断 jenkins 插件信息
# deviceid, os_version, browser_version, sn, = get_base_info()
# print("设备信息".center(20, '*'), deviceid, os_version, browser_version, sn, )
# if 'Multijob' in str(get_jenkins_plugin()): # 检查插件是否存在
# print(True)
# # set_msg()
# else:
# quit('插件不存在,无法执行')
################################
GitLab 根据 labels 统计 issue
需要创建个人令牌(token)文章来源:https://www.toymoban.com/news/detail-672755.html
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
import argparse
import csv
import datetime
import os
import time
"""
API文档
https://docs.gitlab.cn/jh/api/api_resources.html
"""
try:
import pandas as pd
import requests
except ImportError:
print("依赖缺失 安装依赖...")
os.system("pip install requests pandas")
import pandas as pd
import requests
parser = argparse.ArgumentParser(description='gitlab BUG列表查询',
epilog="python get_gitlab_bug.py -t {token} -gid {$gid} -begin 2023-09-01 -status all "
"-labels 'bug solution::fixed' -grep {file}")
parser.add_argument('-t', '--token', help='个人令牌', required=True) # 必填
group = parser.add_mutually_exclusive_group()
group.add_argument('-gid', '--group_id', help='群组ID')
group.add_argument('-pid', '--project_id', help='项目ID')
parser.add_argument('-labels', '--labels', default='bug', help='可自定义输入,快捷标签(bug, test_request, bug_fix)')
parser.add_argument('-status', '--labels_status', default='opened', choices=['all', 'closed', 'opened'],
help='标签状态')
parser.add_argument('-begin', '--created_after', help='指定创建时间之后', )
parser.add_argument('-end', '--created_before', help='指定创建时间之前', )
parser.add_argument('-sort', '--sort', default='desc', choices=['desc', 'asc'],
help='按照 asc 或 desc 的顺序返回议题。默认值是 desc', )
parser.add_argument('-grep', '--grep_author', help='匹配issue作者', )
token = parser.parse_args().token
labels = parser.parse_args().labels
labels_status = parser.parse_args().labels_status
group_id = parser.parse_args().group_id
project_id = parser.parse_args().project_id
created_after = parser.parse_args().created_after
created_before = parser.parse_args().created_before
sort = parser.parse_args().sort
grep_author = parser.parse_args().grep_author
session = requests.session()
session.headers.update({'PRIVATE-TOKEN': token})
base = "http://gitlab.example.com/api/v4/"
issue_list = [] # issue列表
list_id = [] # 项目ID
project_info = {} # 项目信息
write_name = f"export_{labels_status}-{labels}.{time.strftime('%Y-%m-%d', time.localtime())}.csv"
print(write_name)
'''查询 类别及地址'''
if group_id is not None:
url_get_labels = f"{base}groups/{group_id}/labels" # 获取群组 labels
url_get_issue = f"{base}groups/{group_id}/issues" # 获取群组 issue
url_get_count = f"{base}groups/{group_id}/issues_statistics" # 群组信息统计
elif project_id is not None:
url_get_labels = f"{base}projects/{project_id}/labels" # 获取项目 labels
url_get_issue = f"{base}projects/{project_id}/issues" # 获取项目 issue
url_get_count = f"{base}projects/{project_id}/issues_statistics" # 项目信息统计
else:
print(f"project --> {project_id}\t group --> {group_id}")
print("请输入 project_id 或 group_id")
quit()
'''快捷标签 设置'''
if labels is None:
labels = "bug"
elif labels == "test_request":
labels = "Test Request"
elif labels == "release_request":
labels = "Release Request"
elif labels == "bug_fix":
labels = "bug solution::fixed"
'''查询 参数'''
querystring = {"labels": labels, "scope": "all", "per_page": "100"} # 标签 范围 页面数量
# 查询 指定时间之后
if created_after is not None:
begin_time = f"{created_after}T08:00:00Z"
querystring.update({"created_after": begin_time})
if created_before is not None:
end_time = f"{created_before}T08:00:00Z"
querystring.update({"created_before": end_time})
def get_labels():
response = session.request("GET", url_get_labels).json()
print("当前标记列表".center(50, '-'))
for i, dic in enumerate(response):
print(f'{i + 1}\t{dic["name"]} --> {dic["description"]}')
def get_total_issue():
print("\n", "查询概览".center(50, '-'))
response = session.request("GET", url_get_count, params=querystring)
# print(response.status_code)
response = response.json()
# print("->>", response)
if '404' in str(response) or '401' in str(response):
print('查询异常', response)
quit()
else:
print(f"\nIssue Total --> labels >> {labels}\n\t{response['statistics']['counts']}")
def get_label_time(issue, label_name='bug solution::fixed'):
label_create_time = '' # 标签创建时间
# print(issue)
# print(get_issue['id'], get_issue['iid'], get_issue['project_id'], get_issue['state'])
if 'fix' in str(issue['labels']):
# print('\t fixed')
events = session.get(f"{base}projects/{issue['project_id']}/issues/{issue['iid']}/resource_label_events")
# print(events.request.url)
events = events.json()
# print(events)
for event in events:
if event['label']['name'] == label_name:
label_create_time = event['created_at']
# if 'result_fix' not in locals():
# label_create_time = ''
# print('\t fixed-->', label_create_time)
# return label_create_time
return label_create_time
def get_issue():
""" 获取 labels """
querystring.update({"state": labels_status})
print(f"查询参数--> {querystring}")
response = session.request("GET", url_get_issue, params=querystring)
# a = response.headers.get('Link') # 页面链接
# b = response.headers.get('X-Next-Page') # 下一页码
# c = response.headers.get('X-Page') # 当前页码
# d = response.headers.get('X-Per-Page') # 页面数量
e = response.headers.get('X-Total') # 总数据量
f = response.headers.get('X-Total-Pages') # 总页数
# print(a, b, c, d, e, f, )
print(f"\n--> 查询{labels}状态 {labels_status} 总数量 {e} 总页数 {f}")
for i in range(int(f)):
i += 1
querystring.update({"page": f"{i}"}) # 添加页码参数
# print(querystring)
data = session.request("GET", url_get_issue, params=querystring).json()
print(f"\t当前第 {i} 页Issue数量", len(data))
issue_list.extend(data) # 列表添加元素
for item in data: # 提取项目ID
# print(item)
list_id.append(item["project_id"])
# print("run get group Issue OK")
def search_name(project):
""" 查询项目信息 """
url = f"{base}projects/{project}" # 获取项目信息
# print(url)
response = session.request("GET", url, ).json()
# print(response)
return response
def get_projectname():
""" 提取项目名称 """
for index in set(list_id): # 列表 转 集合
data = search_name(index)
# print(data["id"], data["name"])
project_info.update({data["id"]: data["name"]})
# print(len(issue_list))
today = datetime.date.today()
first_day_of_year = datetime.date(today.year, 1, 1)
diff = today - first_day_of_year
current_week = diff.days // 7 + 1
print(
f"\n--> 今天是第{current_week}周\t存在'{labels}'项目 数量{len(project_info.values())}\n\t{project_info.values()}")
def format_issue_list():
""" 提交人 匹配"""
if not grep_author:
print(f'\t提交人:不匹配指定作者')
print('\t如需匹配指定作者,请创建一个文件,内容为`所匹配的作者姓名` 执行时添加对应参数 `-grep {file_name}`')
pass
print(f'-->不匹配数量 {len(issue_list)}')
else:
with open('./group.txt', 'r') as read_file:
name_grep = read_file.read().split()
print(f'\t提交人:匹配指定作者 {name_grep}')
issue_list_tmp = issue_list.copy()
for issue in issue_list_tmp:
if issue["author"]["name"] not in str(name_grep):
issue_list.remove(issue)
print(f'-->匹配后剩余数量 {len(issue_list)}')
def save_csv():
""" 记录到csv """
header_list = ["项目Name", "BUG_ID", "BUG标题", "状态", "标签", "提交人", "指派人", "打开天数", "web_url",
"打开时间", "修复时间", "关闭时间", '公式_标签', ]
with open(write_name, mode="w", encoding="utf-8-sig", newline="") as f:
writer = csv.DictWriter(f, header_list)
writer.writeheader()
# print("写入文件数据量", len(issue_list))
for idx, issue in enumerate(issue_list): # 获取 项目信息
# print('issue_list索引-->', idx + 1, ' ', end='\r') # 简易进度条
print(f'issue_list剩余数量-->', len(issue_list) - idx - 1, ' ', end='\r') # 简易进度条
if issue["state"] == "opened":
count_days = int(
(time.time() - time.mktime(time.strptime(issue["created_at"].split('T')[0], '%Y-%m-%d'))) / (
24 * 60 * 60)) # 计算距今天数
# count_days = formula_since_now = (f'=today()-OFFSET(INDIRECT(ADDRESS(ROW(),COLUMN())),0,2)') # 公式_距今
else:
count_days = 0
assignees = []
for index in issue["assignees"]: # 指派人
assignees.append(index['name'])
created_time = issue['created_at'].split('T')[0] # 创建时间
fixed_time = get_label_time(issue).split('T')[0] # 修复时间
if issue['closed_at'] is None: # 关闭判断
issue['closed_at'] = '' # open状态的没有关闭时间
closed_time = issue['closed_at'].split('T')[0] # 关闭时间
issue['title'] = str(issue['title']).replace('"', "'") # 替换标题中的双引号
issue['title'] = f'=HYPERLINK("{issue["web_url"]}","{issue["title"]}")' # 设置超链接
# excel 过滤公式
# 判断(统计判断(单元格偏移(单元格引用文本(当前行, 当前列))), 列偏移量,行偏移量)
# formula_labels = (
# '=(IF(COUNTIF(OFFSET(INDIRECT(ADDRESS(ROW(),COLUMN())),0,-7),"*low*")=0,IF(COUNTIF(OFFSET(INDIRECT(ADDRESS(ROW(),COLUMN())),0,-7),"*medium*")=0,IF(COUNTIF(OFFSET(INDIRECT(ADDRESS(ROW(),COLUMN())),0,-7),"*high*")=0,"无等级标签","高"),"中"),"低"))')
if 'bug' in str(issue['labels']):
if 'low' in str(issue['labels']):
formula_labels = '低'
elif 'medium' in str(issue['labels']):
formula_labels = '中'
elif 'high' in str(issue['labels']):
formula_labels = '高'
else:
formula_labels = '未知等级'
else:
formula_labels = '未知等级'
writer.writerow(
{"项目Name": project_info[issue["project_id"]], "BUG_ID": issue["iid"], "BUG标题": issue["title"],
"状态": issue["state"], "标签": issue["labels"], "提交人": issue["author"]["name"],
"指派人": assignees, "打开天数": count_days, "web_url": issue["web_url"], "打开时间": created_time,
"修复时间": fixed_time, "关闭时间": closed_time, '公式_标签': formula_labels, })
pwd = os.getcwd().replace('\\', '/') # 获取当前路径
print(f'\n-------- write done -------> file://{pwd}/{write_name}')
def sort():
""" 排序 """
print("排序中 ...")
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)
data = pd.read_csv(write_name, index_col="项目Name")
data.sort_values('项目Name', inplace=True)
# test = data.sort_values('项目Name', inplace=True)
# print(test)
data.to_csv(write_name)
print("\n按 项目Name 排序 done")
if __name__ == '__main__':
os.system("pip install requests pandas")
# get_labels() # 获取 标记列表
get_issue()
get_projectname()
format_issue_list()
save_csv()
sort()
自用模板
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# @Time : 2023-08-04 10:02:43
import argparse
import datetime
import json
import subprocess
import time
import requests
parser = argparse.ArgumentParser(description='Jenkins多JOB集合 发送消息到飞书',
epilog="执行示例>>> python ${webhook} -job_url ${JOB_URL}")
parser.add_argument('webhook', help='机器人webhookURL') # 必填
parser.add_argument('-job_url', '--JOB_URL', help='作业URL', required=True, ) # 必填
webhook = parser.parse_args().webhook
JOB_URL = parser.parse_args().JOB_URL
BUILD_URL = JOB_URL + '/lastBuild'
job_name = [] # 运行名称
job_duration = [] # 运行时长
job_status = [] # 运行状态
job_url = [] # 运行结果
pass_rate = [] # 百分比显示
#########
print('脚本修改时间:2023-09-18 10:02:43') # 修改allure执行报错
#########
session = requests.Session()
def set_msg():
# ------------------
# ------ 数据获取 ------
# ------------------
JENKINS_URL = BUILD_URL.split('job')[0] # JENKINS_URL
# ------ begin 登陆异常 ------
try:
get_result = requests.get(f'{BUILD_URL}/api/json', auth=jenkins_auth)
print(f"数据查询API地址 {get_result.url}")
result = get_result.json()
# except JSONDecodeError: # 解析异常 未登录
# quit('Error 401 Unauthorized')
except OSError: # 网络异常
msg = {"msg_type": "text", "content": {"text": "获取报告地址异常/无法访问"}}
res_msg = requests.post(url=webhook, data=json.dumps(msg), headers={"Content-Type": "application/json"})
print(res_msg.text)
quit('获取报告地址异常/无法访问')
except Exception as e:
text = f"消息发送发生异常状态\t{type(e).__name__} --> {str(e)}\n检查API数据\t请手动前{BUILD_URL}查看报告"
msg = {"msg_type": "text", "content": {"text": text}}
res_msg = requests.post(url=webhook, data=json.dumps(msg), headers={"Content-Type": "application/json"})
print(res_msg.text)
quit(text)
# res = requests.get(f'{BUILD_URL}/api/json', auth=jenkins_auth)
# if res.status_code == 401:
# quit('Error 401 Unauthorized')
# else:
# result = res.json()
# # ------ end 登陆异常 ------
shortDescription = result['actions'][0]['causes'][0]['shortDescription'] # 启动者
timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(result['timestamp'] / 1000)) # 编译开始时间
# ------ begin 获取持续时间 ------
if result['duration'] != 0: # 获取字段
duration = int(result['duration']) // 1000 # 编译持续时间
minutes, seconds = divmod(duration, 60)
hours, minutes = divmod(minutes, 60)
build_duration = f'{hours}h {minutes}m {seconds}s'
else: # 通过当前时间计算
print("\t通过当前时间 计算持续时间")
build_duration = datetime.datetime.now() - datetime.datetime.fromtimestamp(result['timestamp'] / 1000)
build_duration = str(build_duration).split('.')[0]
# ------ end 获取持续时间 ------
total_count = len(result['subBuilds']) # 数量总计
print(f'\n\n======= 项目概览 ======= \n'
f'shortDescription:{shortDescription}\nbuild duration:{build_duration}\ntotal_count:{total_count}\ntimestamp:{timestamp}\n')
print('****** 提示: 没有allure就报错 无法运行 JSONDecodeError ******')
for index in result['subBuilds']: # 提取数据
# print(index)
# print(index['result'])
# 数据预处理
allure_summary = requests.get(f"{JENKINS_URL}{index['url']}/allure/widgets/summary.json", auth=jenkins_auth)
allure_history_trend = requests.get(f"{JENKINS_URL}{index['url']}/allure/widgets/history-trend.json", auth=jenkins_auth)
try:
allure_summary = allure_summary.json()['statistic']
# print(allure_history_trend.request.url)
# allure_pass_rate = round(allure_pass_rate, 2) if allure_pass_rate != 100 else 100 # 保留2位小数 值为100不保留小数
try:
allure_history_trend = allure_history_trend.json()[1]
except IndexError:
print('没有历史数据')
allure_history_trend = allure_history_trend.json()[0]
if allure_summary['total'] != 0: # 除数不能为0
allure_pass_rate = allure_summary['passed'] / allure_summary['total']
else:
allure_pass_rate = 0 # 除数不能为0
if allure_history_trend['data']['total'] != 0:
allure_history = allure_history_trend['data']['passed'] / allure_history_trend['data']['total']
else:
allure_history = 0
except Exception as e:
print("获取Allure报告异常", type(e).__name__, str(e))
allure_pass_rate = allure_history = 0
allure_summary = {'passed': None, 'total': None}
# print("数据获取完成 按指定样式格式化中")
# ------------------
# ------ 设置样式 ------
# ------------------
# 通过率对比
allure_change = allure_pass_rate - allure_history
print(f"\t{index['jobName']} --> 本次比上次通过率 {allure_change}")
if allure_pass_rate > allure_history:
allure_pass_rate = f'<font color=green>↑{allure_pass_rate:.2%}</font>'
elif allure_pass_rate < allure_history or allure_pass_rate == 0:
allure_pass_rate = f'<font color=red>↓{allure_pass_rate:.2%}</font>'
else:
allure_pass_rate = f' {allure_pass_rate:.2%}'
# ------------------
# ------ 载入数据 ------
# ------------------
job_name.append({"tag": "markdown", "content": f"{index['jobName']}", "text_align": "center"})
job_duration.append({"tag": "markdown", "content": f"{index['duration']}", "text_align": "center"})
job_status.append(
{"tag": "markdown", "content": f"{index['result']}", "text_align": "center"})
job_url.append(
{"tag": "markdown", "content": f"[查看]({JENKINS_URL}{index['url']}/allure)", "text_align": "center"})
pass_rate.append(
{"tag": "markdown",
"content": f"{allure_summary['passed']}/{allure_summary['total']}{allure_pass_rate}",
"text_align": "center"})
print(
f'\n\n======= 项目详情 ======= \n{job_name}\n{job_duration}\n{job_status}\n{pass_rate}\n{job_url}\n\t{allure_change}')
# ------------------
# ------ 卡片模板 ------
# ------------------
card = json.dumps({
"elements": [
{
"tag": "markdown",
"content": f"**项目总览**"
},
{
"tag": "column_set",
"flex_mode": "bisect",
"background_style": "grey",
"horizontal_spacing": "default",
"columns": [
{
"tag": "column",
"width": "weighted",
"weight": 1,
"elements": [
{
"tag": "markdown",
"text_align": "center",
"content": f"项目数量\n**{total_count}**\n"
}
]
},
{
"tag": "column",
"width": "weighted",
"weight": 1,
"elements": [
{
"tag": "markdown",
"text_align": "center",
"content": f"执行时间\n**{timestamp}**\n"
}
]
},
{
"tag": "column",
"width": "weighted",
"weight": 1,
"elements": [
{
"tag": "markdown",
"text_align": "center",
"content": f"持续时间\n**{build_duration}**\n"
}
]
},
]
},
{
"tag": "markdown",
"content": "**项目信息**"
},
{
"tag": "column_set",
"flex_mode": "none",
"background_style": "grey",
"columns": [
{
"tag": "column",
"width": "weighted",
"weight": 3,
"vertical_align": "top",
"elements": [
{
"tag": "markdown",
"content": "**项目名**",
"text_align": "center"
}
]
},
{
"tag": "column",
"width": "weighted",
"weight": 2,
"vertical_align": "top",
"elements": [
{
"tag": "markdown",
"content": "**运行时长**",
"text_align": "center"
}
]
},
{
"tag": "column",
"width": "weighted",
"weight": 2,
"vertical_align": "top",
"elements": [
{
"tag": "markdown",
"content": "**passed/total/通过率**",
"text_align": "center"
}
]
},
{
"tag": "column",
"width": "weighted",
"weight": 1,
"vertical_align": "top",
"elements": [
{
"tag": "markdown",
"content": "**Report**",
"text_align": "center"
}
]
}
]
},
{
"tag": "column_set",
"flex_mode": "none",
"background_style": "default",
"columns": [
{
"tag": "column",
"width": "weighted",
"weight": 3,
"vertical_align": "top",
"elements": job_name
},
{
"tag": "column",
"width": "weighted",
"weight": 2,
"vertical_align": "center",
"elements": job_duration
},
{
"tag": "column",
"width": "weighted",
"weight": 2,
"vertical_align": "top",
"elements": pass_rate
},
{
"tag": "column",
"width": "weighted",
"weight": 1,
"vertical_align": "top",
"elements": job_url
}
]
},
{
"tag": "hr"
},
{
"tag": "note",
"elements": [
{
"tag": "img",
"img_key": f"{img_icon}",
"alt": {
"tag": "plain_text",
"content": ""
}
},
{
"tag": "plain_text",
"content": f"{shortDescription}\n通过率 与上次执行变化:绿色为上升,红色下降"
},
{
"tag": "plain_text",
"content": f"系统:{os_version}\n浏览器:{browser_version}\nDeviceID:{deviceid}\nSN:{sn}"
}
]
}
]
})
body = json.dumps({"msg_type": "interactive", "card": card}) # 使用 当前模板
headers = {"Content-Type": "application/json"}
res = requests.post(url=webhook, data=body, headers=headers)
print(f'消息发送响应 -->\n\t {res.text}')
if __name__ == '__main__':
img_icon = 'img_v2_098e80ae-e583-4148-b822-f42a05298d3g' # img_key_id jenkinsIcon
jenkins_auth=("auto", "1")
set_msg()
-
python ↩︎文章来源地址https://www.toymoban.com/news/detail-672755.html
到了这里,关于【无标题】jenkins消息模板(飞书)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!