Python实现企业微信告警
1. 创建企业微信群机器人
1-1. 什么是企业微信群机器人?
企业微信群机器人是企业微信平台提供的一种功能,可以通过Webhook方式将消息发送到指定的企业微信群中。它可以用于自动化发送通知、告警等信息,实现监控和信息共享。
1-2. 为什么使用企业微信群机器人进行告警通知?
在企业中,监控和告警是至关重要的。当系统出现异常、故障或重要事件发生时,及时通知相关人员是保障业务稳定运行的关键。企业微信群机器人提供了一种方便、快速、可定制的告警通知方式,帮助团队及时响应和处理问题。
1-3. 添加企业微信群机器人
在企业微信群聊里添加机器人
- 添加机器人
- 为机器人起名、自定义头像
- 获取Webhook机器人地址
这里的Webhook机器人地址,后续用来接收消息,复制保存下来。
地址url格式:‘https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=c0cda19e-9523-4020-a476-xxxxxxxxxxxx’
配置说明中可以查看机器人详细的使用介绍
- 查看推送消息示例
- 机器人配置说明
- 推送消息配置
2. 推送消息demo测试
利用curl
发送消息至Webhook机器人地址,机器人将消息发送至群里
参数介绍:
参数 | 必要参数 | 说明 |
---|---|---|
msgtype | true | 消息类型,此时固定为text |
content | true | 文本内容,最长不超过2048个字节,必须是utf8编码 |
mentioned_list | false | userid的列表,提醒群中的指定成员(@某个成员),@all表示提醒所有人 |
mentioned_mobile_list | false | 手机号列表,提醒手机号对应的群成员(@某个成员),@all表示提醒所有人 |
命令行demo (直接在服务器命令行执行如下代码):
curl 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=c0cda19e-9523-4020-a476-xxxxxxxxxxxx' \
-H 'Content-Type: application/json' \
-d '
{
"msgtype": "text",
"text": {
"content": "存储空间超过90%,请登录prd-etl01服务器进行处理"
}
}'
{"errcode":0,"errmsg":"ok
- 查看企业微信群
可以看到,发送的json中, “content”: “存储空间超过90%,请登录prd-etl01服务器进行处理”,已经发送至企业微信群中,这是最简单的demo实现。
3.使用Python发送告警消息
3-1. 文本类型告警发送
原理是利用Python发送POST请求
通过Python的
requests
库,可以轻松地发送POST请求到Webhook URL,实现消息的发送。
[root@wangting monitor]# cat monitor_wechat.py
# -*- coding: utf-8 -*-
# Created on 2023年08月24日
# @author: wangting
import requests
import json
url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=c0cda19e-9523-4020-a476-xxxxxxxxxxxx'
headers = {'content-type': 'application/json'}
data = {
"msgtype": "text",
"text": {
"content": "机房dolphin工作流check任务执行失败",
"mentioned_list": ["王亭", "@all"],
}
}
data = json.dumps(data)
print(requests.post(url=url, headers=headers, data=data))
运行Python脚本
[root@wangting monitor]# python3 monitor_wechat.py
<Response [200]>
如果requests、json模块未安装,可以使用
pip install
进行安装# 模块安装 [root@wangting monitor]# pip3 install simplejson [root@wangting monitor]# pip3 install requests
- 查看企业微信群效果
3-2. 图文类型告警发送
还可以通过企业微信机器人发送带有图片的消息,以便更直观地展示问题。以下是一个示例,演示如何发送带有图片的消息
[root@wangting monitor]# cat monitor_wechat_2.py
# -*- coding: utf-8 -*-
# Created on 2023年08月24日
# @author: wangting
import requests
import json
url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=c0cda19e-9523-4020-xxxxxxxxxxxx'
headers = {'content-type': 'application/json'}
data = {
"msgtype": "news",
"news": {
"articles" : [
{
"title" : "<危险|撤退>",
"description" : "有内鬼,终止交易!",
"url" : "https://osswangting.oss-cn-shanghai.aliyuncs.com/monitor/warning.jpg",
"picurl" : "https://osswangting.oss-cn-shanghai.aliyuncs.com/monitor/warning.jpg"
}
]
}
}
data = json.dumps(data)
print(requests.post(url=url, headers=headers, data=data))
运行Python脚本
[root@wangting monitor]# python3 monitor_wechat_2.py
<Response [200]>
- 查看企业微信群效果
“url” : “https://osswangting.oss-cn-shanghai.aliyuncs.com/monitor/warning.jpg”,这里的url和值表示点击企业微信群告警信息中的图片后,可以跳转的地址,例如一般可以加监控grafana的监控项URL等等
参数说明:
参数 | 必要参数 | 说明 |
---|---|---|
msgtype | true | 消息类型,此时固定为news |
articles | true | 图文消息,一个图文消息支持1到8条图文 |
title | true | 标题,不超过128个字节,超过会自动截断 |
description | false | 描述,不超过512个字节,超过会自动截断 |
url | true | 点击图片后跳转的链接 |
picurl | false | 图文消息的图片链接,支持JPG、PNG格式,较好的效果为大图 1068455,小图150150 |
3-3. 定时任务告警信息发送
可以设置定时任务,定期发送监控摘要到企业微信群,以便团队及时了解系统状态。以下是一个示例,演示如何设置定时任务发送监控摘要
需要使用Python schedule
模块
[root@wangting monitor]# pip3 install schedule
Collecting schedule
Downloading http://mirrors.cloud.aliyuncs.com/pypi/packages/eb/3b/040bd180eaef427dd160562ee66adc9f4f67088185c272edcdb899c609c7/schedule-1.1.0-py2.py3-none-any.whl
Installing collected packages: schedule
Successfully installed schedule-1.1.0
脚本内容:
[root@wangting monitor]# cat monitor_wechat_3.py
# -*- coding: utf-8 -*-
# Created on 2023年08月24日
# @author: wangting
import requests
import schedule
url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=c0cda19e-9523-4020-xxxxxxxxxxxx'
def schedule_monitor():
data = {
"msgtype": "text",
"text": {
"content": "定时巡检:机房dolphin工作流check任务执行失败",
"mentioned_list": ["王亭", "@all"],
}
}
res = requests.post(url, json=data)
schedule.every().minutes.do(schedule_monitor)
while True:
schedule.run_pending()
# schedule.clear() # 取消任务方法
# schedule.every().day.do(schedule_monitor).run() # 只运行当前一次
运行Python脚本
[root@wangting monitor]# python3 monitor_wechat_3.py
- 查看企业微信群效果
schedule模块常用时间示例
# 秒
schedule.every().seconds # 每秒运行一次
schedule.every(2).seconds # 每2秒运行一次
schedule.every(1).to(5).seconds # 每1-5秒运行一次
# 分钟
schedule.every().minutes # 每分钟运行一次
# 小时
schedule.every().hour # 每小时运行一次
# 天
schedule.every().day # 每天运行一次如果后面没有at表示每天当前时间执行一次
schedule.every().day.at("00:00"). # 每天凌晨运行一次
# 周
schedule.every().week # 每周凌晨运行一次
schedule.every().wednesday.at("00:00") # 每周三凌晨运行一次
# at 常用值
at(HH:MM:SS) # 准确时分秒
every().hour.at(':30') # 每小时的30分
every().minute.at(':30') # 每一分钟的30秒
# 每8周执行一次
schedule.every(8).weeks.do(job)
3-4.获取数据库状态信息发送告警
测试表:
CREATE TABLE `monitor_table` (
`id` int(11) NOT NULL,
`monitor_info` varchar(255) DEFAULT NULL,
`update` timestamp NULL DEFAULT NULL,
`status` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
样例数据:
MariaDB [test]> select * from monitor_table ;
+----+--------------+---------------------+---------+
| id | monitor_info | update | status |
+----+--------------+---------------------+---------+
| 1 | mysql_port | 2023-08-24 14:39:51 | success |
| 2 | mysql_port | 2023-08-24 14:43:14 | error |
+----+--------------+---------------------+---------+
需要使用Python pymysql
模块
脚本内容:
# -*- coding: utf-8 -*-
# Created on 2023年08月24日
# author: wangting
import pymysql.cursors
import requests
import json
# 消息发送
def postmsg(url, post_data):
post_data = '{"msgtype": "markdown", "markdown": {"content": "%s"}}' % post_data
print(post_data)
if url == '':
print('URL地址为空!')
else:
r = requests.post(url, data=post_data.encode())
rstr = r.json()
if r.status_code == 200 and 'error' not in rstr:
result = '发送成功'
return result
else:
return 'Error'
# 数据库链接
def querySQL():
conn = pymysql.connect(
host='wangting_host',
user='root',
password='123456',
database='test'
)
cursor = conn.cursor()
sql = "select monitor_info, status from monitor_table order by id desc limit 1;"
cursor.execute(sql)
row = cursor.fetchone()
if row is None:
print('没有数据')
outmsg = "## 状态信息: \n" + "<font color='info'>暂无新增数据</font>"
return outmsg
else:
outmsg = "## 告警信息:\n"
while row:
color = "info" # 默认颜色为蓝色
if row[1] == "success":
color = "success" # 绿色
elif row[1] == "error":
color = "warning" # 红色(实际中颜色可能有所不同)
outmsg = outmsg + f"> <font color='{color}'>-【{row[0]}】</font>, {row[1]}\n"
row = cursor.fetchone()
cursor.close()
conn.close()
return outmsg
if __name__ == '__main__':
url = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=c0cda19e-9523-4020-xxxxxxxxxxxx"
post_data = querySQL()
result = postmsg(url, post_data)
运行脚本后查看效果
文章来源:https://www.toymoban.com/news/detail-671468.html
4. 总结
企业微信群机器人是一个强大的工具,用于实现监控告警和信息通知。机器人发送不同类型的消息通知,我们合理利用企业微信群机器人,可以提升团队的响应速度和业务稳定性,在实际工作中提升效率和效果。文章来源地址https://www.toymoban.com/news/detail-671468.html
到了这里,关于Python实现企业微信群告警的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!