提高错误日志处理效率!使用Python和钉钉机器人实现自动告警聚合

这篇具有很好参考价值的文章主要介绍了提高错误日志处理效率!使用Python和钉钉机器人实现自动告警聚合。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1、背景

日志是非常重要的信息资源。它们记录了应用程序的运行状态、错误和异常情况,帮助我们了解系统的健康状况以及发现潜在的问题。为了高效地管理和分析日志数据,许多组织采用了Elasticsearch、Logstash和Kibana(ELK)堆栈作为日志收集和分析的解决方案。

开发一个实时监控和告警脚本,专门用于监控ELK平台中的错误日志,并及时发送告警通知给相关人员。该系统将通过扫描Elasticsearch中的日志数据,筛选出等级为ERROR的错误日志,并根据预设的告警规则进行处理。

2、目的

使用Python从Elasticsearch中查询特定级别为ERROR的错误日志,并通过钉钉机器人实现告警聚合和发送,以提高错误日志的处理效率和及时响应能力。

为什么开发这个脚本?
因为目前我们这边没有监控日志的信息,出现问题不能及时发现 和预知
优势
1、消息进行聚合,每个项目的多条告警信息,汇总一条发送。突破钉钉机器人每分钟只能发送20条的限制
2、告警信息you太多的重复,进行去重处理,添加告警次数发送。防止被钉钉限流
提高错误日志处理效率!使用Python和钉钉机器人实现自动告警聚合

3、原理

  1. 使用Python的Elasticsearch库连接到Elasticsearch集群。
  2. 构建Elasticsearch查询DSL(领域专用语言),过滤出级别为ERROR的日志记录。
  3. 执行查询并获取结果。
  4. 对查询结果进行聚合,统计每个项目的错误次数。
  5. 根据聚合结果,生成告警消息的Markdown格式内容。
  6. 使用钉钉机器人发送告警消息到指定的钉钉群。

4、流程

  1. 导入必要的Python库,包括elasticsearchrequests
  2. 创建Elasticsearch连接,指定Elasticsearch集群的主机和端口。
  3. 构建Elasticsearch查询DSL,设置查询条件为日志级别为ERROR。
  4. 执行查询,获取查询结果。
  5. 对查询结果进行处理,聚合每个项目的错误次数。
  6. 根据聚合结果生成告警消息的Markdown内容。
  7. 使用钉钉机器人API发送告警消息到指定的钉钉群。

5、实现代码

提高错误日志处理效率!使用Python和钉钉机器人实现自动告警聚合


# -*- coding: utf-8 -*-
# @Time    : 2023/6/17 18:11
# @Author  : 南宫乘风
# @Email   : 1794748404@qq.com
# @File    : all_es.py
# @Software: PyCharm
from collections import Counter
from datetime import datetime, timedelta

import requests
from elasticsearch import Elasticsearch

from monitor.es_ding import send_pretty_message

# Elasticsearch客户端实例
es = Elasticsearch(hosts=['http://172.18.xxx.xxxx:9200'], http_auth=('elastic', 'xxxxx'),
                   sniff_on_start=True,  # 连接前测试
                   sniff_on_connection_fail=True,  # 节点无响应时刷新节点
                   sniff_timeout=300,  # 设置超时时间
                   headers={'Content-Type': 'application/json'})


def format_timestamp(timestamp):
    """格式化时间为Elasticsearch接受的字符串格式"""
    return timestamp.strftime("%Y-%m-%d %H:%M:%S")


def search_errors():
    """执行查询,获取错误日志数据"""
    current_time = datetime.now()
    one_minute_ago = current_time - timedelta(minutes=10)
    current_time_str = format_timestamp(current_time)
    one_minute_ago_str = format_timestamp(one_minute_ago)

    index = 'app-prod-*'  # 替换为实际的索引名称

    query = {
        "query": {
            "bool": {
                "filter": [
                    {
                        "range": {
                            "@timestamp": {
                                "gte": one_minute_ago_str,
                                "lt": current_time_str,
                                "format": "yyyy-MM-dd HH:mm:ss",
                                "time_zone": "+08:00"
                            }
                        }
                    },
                    {
                        "match": {
                            "loglevel": "ERROR" #匹配项目错误等级
                        }
                    },
                    {
                        "bool": {
                            "must_not": [
                                {
                                    "match": {
                                        "projectname": "fox-data-spiderman" # 需要屏蔽的项目
                                    }
                                }
                            ]
                        }
                    }
                ]
            }
        },
        "_source": [  ## 输出的字段
            "date",
            "projectname",
            "threadname",
            "msg"
        ],
        "from": 0,
        "size": 10000, # 返回查询的条数
    }

    result = es.search(index=index, body=query)
    total_documents = result["hits"]["total"]["value"]
    print(f"总共匹配到 {total_documents} 条文档")

    result = result['hits']['hits']
    all_result = []

    for i in result:
        all_result.append(i['_source'])

    msg_counter = Counter(d['msg'] for d in all_result if 'msg' in d)
    results = []

    for d in all_result:
        if 'msg' in d and d['msg'] in msg_counter:
            count = msg_counter[d['msg']]
            del msg_counter[d['msg']]
            d['count'] = count
            d['msg'] = d['msg'][:100] + ('...' if len(d['msg']) > 100 else '')
            results.append(d)

    return results


def aggregate_errors(results):
    """按项目名称聚合错误日志"""
    aggregated_data = {}
    for d in results:
        projectname = d.get('projectname')
        if projectname:
            if projectname not in aggregated_data:
                aggregated_data[projectname] = []
            aggregated_data[projectname].append({'date': d.get('date'), 'msg': d.get('msg'), 'count': d.get('count')})
    return aggregated_data


def generate_summary(projectname, messages):
    """生成Markdown格式的消息摘要"""
    markdown_text = f'### {projectname} \n\n'
    for message in messages:
        markdown_text += f"**时间:** {message['date']}\n\n"
        markdown_text += f"**告警次数:** <font color='red'><b>{message['count']}</b></font>\n\n"
        markdown_text += f"{message['msg']}\n\n---\n\n"
    return markdown_text


def send_message_summary(projectname, messages):
    """发送摘要消息给钉钉机器人"""
    summary = generate_summary(projectname, messages)
    data = {
        'msgtype': 'markdown',
        'markdown': {
            'title': f'{projectname}消息告警',
            'text': summary
        }
    }
    webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=xxxxxxxxxxxxxxxxx'  # 替换为实际的Webhook URL
    response = requests.post(webhook_url, json=data)
    if response.status_code == 200:
        print('消息发送成功')
    else:
        print('消息发送失败')


if __name__ == '__main__':
    errors = search_errors()
    aggregated_errors = aggregate_errors(errors)

    for projectname, messages in aggregated_errors.items():
        print(f"{projectname}:")
        print(messages)

提高错误日志处理效率!使用Python和钉钉机器人实现自动告警聚合

6、Crontab添加定时任务

也可以用采用:Jenkins与GitLab的定时任务工作流程
https://blog.csdn.net/heian_99/article/details/131164591?spm=1001.2014.3001.5501

#日志
*/2 * * * * cd /python_app/elasticsearch; /opt/anaconda3/envs/py38/bin/python -u  es_monitor.py >> es_error_info.log 2>&1

该定时任务的含义是每隔2分钟执行一次指定目录下的 es_monitor.py 脚本,并将输出信息追加到 es_error_info.log 文件中。这样可以定期监控 Elasticsearch 的错误日志,并记录相关信息以便后续查看和分析。

7、总结

本博客,为我们构建了一个完整的应用日志监控和告警系统,通过ELK技术栈和钉钉机器人的结合,使得我们能够及时发现和处理应用中的错误,提高了团队的工作效率和系统的稳定性。文章来源地址https://www.toymoban.com/news/detail-506846.html

到了这里,关于提高错误日志处理效率!使用Python和钉钉机器人实现自动告警聚合的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 从零开始学习Web自动化测试:如何使用Selenium和Python提高效率?

    目录 引言: 一、了解Web自动化测试的基本概念 二、选择Web自动化测试工具 三、学习Web自动化测试的技能 四、实践Web自动化测试 五、总结 随着互联网的不断发展,Web自动化测试越来越受到关注。Web自动化测试可以帮助我们轻松地检查网站的功能和性能,提高软件开发的效率

    2024年02月01日
    浏览(51)
  • 视频增强与压缩:提高视频处理效率的关键

    视频处理是现代计算机视觉和人工智能领域的一个关键技术,它涉及到对视频数据进行处理、分析、压缩和增强等多种操作。随着互联网和移动互联网的发展,视频数据的产生和传播速度越来越快,这为视频处理技术带来了巨大挑战。在这篇文章中,我们将深入探讨视频增强

    2024年02月21日
    浏览(61)
  • 学会RabbitMQ的延迟队列,提高消息处理效率

    手把手教你,本地RabbitMQ服务搭建(windows) 消息队列选型——为什么选择RabbitMQ RabbitMQ灵活运用,怎么理解五种消息模型 RabbitMQ 能保证消息可靠性吗 推或拉? RabbitMQ 消费模式该如何选择 死信是什么,如何运用RabbitMQ的死信机制? 真的好用吗?鲜有人提的 RabbitMQ-RPC模式 前面

    2024年02月14日
    浏览(67)
  • MySQL表操作:提高数据处理效率的秘诀(进阶)(2)

    💕“学习难免有坎坷,重要的是你能尽力而为,持之以恒。”💕 🐼作者:不能再留遗憾了🐼 🎆专栏:MySQL学习🎆 🚗本文章主要内容:MySQL表操作进阶:聚合查询和联合查询🚗 前面我们学习了MySQL进阶的数据库约束、表的设计和新增,那么今天我将为大家分享MySQL表查询

    2024年02月08日
    浏览(50)
  • MySQL表操作:提高数据处理效率的秘诀(进阶)(1)

    💕**“生命不在于相信奇迹,而在于创造奇迹。”——朱学恒**💕 🐼作者:不能再留遗憾了🐼 🎆专栏:MySQL学习🎆 🚗本文章主要内容:MySQL对表操作进阶。数据库约束、表的设计、新增,后续会更新进阶表的查询🚗 前面我们已经介绍过初识MySQL以及MySQL对库和对表操作(

    2024年02月08日
    浏览(43)
  • 自然语言处理与大数据:如何提高数据分析效率

    自然语言处理(NLP,Natural Language Processing)是计算机科学与人工智能领域的一个分支,研究如何让计算机理解、生成和处理人类语言。自然语言处理技术广泛应用于各个领域,包括机器翻译、语音识别、情感分析、文本摘要等。 随着数据的大量生成和存储,大数据技术已经成为

    2024年04月09日
    浏览(61)
  • 如何提高倾斜摄影超大场景的三维模型轻量化处理速度和效率?

     倾斜摄影超大场景的三维模型轻量化处理是将高精度的三维模型进行降采样、简化等处理,以达到减少数据大小和提高渲染性能的目的。为了提高轻量化处理速度,可以从以下方面入手: 1、选择合适的轻量化算法。当前已有很多成熟的三维模型轻量化算法,如基于多分辨率

    2024年02月01日
    浏览(75)
  • 低代码核心能力表单引擎可以提高业务处理效率,降低成本的

    在数字化时代,企业面临着海量的数据和复杂的业务需求,对于低代码表单的需求也逐渐增加,低代码表单可以提高企业的业务处理效率,还可以降低开发成本,缩短开发周期。 低代码的表单主要用于数据采集、流程审批和业务运营等场景,可以帮助企业更高效地管理数据和

    2024年02月04日
    浏览(44)
  • 数据建模的云计算支持:利用云计算资源提高数据处理效率

    数据建模是数据科学和机器学习领域中的一个重要环节,它涉及到将实际问题转化为数学模型的过程。随着数据规模的不断扩大,传统的数据处理方法已经无法满足需求,因此需要寻找更高效的数据处理方法。云计算是一种基于互联网的计算资源分配和共享方式,它可以提供

    2024年04月28日
    浏览(42)
  • 三维模型3DTile格式轻量化压缩处理效率提高的技术方浅析

    随着三维模型在各个领域的广泛应用,对于其格式的轻量化压缩处理和效率提高的需求也越发迫切。本文将介绍一些技术方法,帮助实现三维模型3DTile格式的轻量化压缩处理并提高处理效率。 首先,针对三维模型的轻量化压缩处理,我们可以采用以下方法: 1、减少顶点数:

    2024年02月09日
    浏览(54)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包