使用Python基于metricbeat和heartbeat采集数据进行告警

这篇具有很好参考价值的文章主要介绍了使用Python基于metricbeat和heartbeat采集数据进行告警。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、系统架构

IP 主机名 角色 备注
11.0.1.11 kafka1 kafka和MySQL
11.0.1.12 kafka2 kafka
11.0.1.13 kafka3 kafka
11.0.1.14 demo1 metricbeat和heartbeat

二、部署Kafka
省略

二、部署Metricbeat和Heartbeat
metricbeat配置:

metricbeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: false

fields:
  ip: 11.0.1.14

output.kafka:
  hosts: ["11.0.1.11:9092","11.0.1.12:9092","11.0.1.13:9092"]
  topic: "ELK-metricbeat"

heartbeat配置:

heartbeat.config.monitors:
  path: ${path.config}/monitors.d/*.yml
  reload.enabled: false
  reload.period: 5s

# ----------------------------  Kafka Output ----------------------------
output.kafka:
  hosts: ["11.0.1.11:9092","11.0.1.12:9092","11.0.1.13:9092"]
  topic: "ELK-heartbeat"

heartbeat的tcp.yml配置:

- type: tcp 
  id: my-tcp-monitor
  name: My TCP monitor
  enabled: true
  schedule: '@every 20s' 
  hosts: ["11.0.1.14:80","11.0.1.13:80","11.0.1.12:80"]
  ipv4: true
  ipv6: true
  mode: all

三、MariaDB表结构
cmdb_app表(存储应用系统的信息):

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

-- ----------------------------
-- Table structure for cmdb_app
-- ----------------------------
DROP TABLE IF EXISTS `cmdb_app`;
CREATE TABLE `cmdb_app`  (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `app_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
  `ops_user` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
  `ops_tel` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
  `ops_dep` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Compact;

SET FOREIGN_KEY_CHECKS = 1;

解释:
app_name:系统名称
ops_user:运维人员姓名
ops_tel:运维人员手机号
ops_dep:运维责任部门

cmdb_os表(存储服务器信息):

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

-- ----------------------------
-- Table structure for cmdb_os
-- ----------------------------
DROP TABLE IF EXISTS `cmdb_os`;
CREATE TABLE `cmdb_os`  (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `app_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
  `eip` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
  `module` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Compact;

SET FOREIGN_KEY_CHECKS = 1;

解释:
app_name:系统信息
eip:服务器IP
module:服务器用途

alert_list表(存储告警信息):

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

-- ----------------------------
-- Table structure for alert_list
-- ----------------------------
DROP TABLE IF EXISTS `alert_list`;
CREATE TABLE `alert_list`  (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `timestamp` datetime NULL DEFAULT NULL,
  `url` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
  `status` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
  `app_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
  `ops_user` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
  `ops_tel` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
  `ops_dep` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
  `module` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Compact;

SET FOREIGN_KEY_CHECKS = 1;

四、使用Python程序,从Kafka读取数据,并将cmdb_os和cmdb_app信息根据kafka数据中的ip信息匹配起来,并将新的数据写入到新的Kafka

安装依赖:

pip install kafka-python pymysql apscheduler pyyaml

先说metricbeat_replace.py:

import json
import logging
from logging.handlers import RotatingFileHandler
import pymysql
from kafka import KafkaConsumer, KafkaProducer
import yaml
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger

class DatabaseConnectionError(Exception):
    def __init__(self, message="数据库连接失败"):
        self.message = message
        super().__init__(self.message)

class KafkaCMDBProcessor:
    def __init__(self, kafka_config, mysql_config):
        self.kafka_config = kafka_config
        self.mysql_config = mysql_config
        self.logger = self.setup_logger()
        self.cmdb_data = None

        # 初始化调度器
        self.scheduler = BackgroundScheduler()
        self.scheduler.start()

        # 添加定时任务,每个整点、10分、20分、30分、40分、50分的时候执行
        self.scheduler.add_job(self.load_cmdb_data, CronTrigger(minute='0,10,20,30,40,50'))

    @staticmethod
    def setup_logger():
        logger = logging.getLogger(__name__)
        logger.setLevel(logging.DEBUG)

        # 创建控制台处理程序并设置级别为调试
        ch = logging.StreamHandler()
        ch.setLevel(logging.DEBUG)

        # 创建文件处理程序并设置级别为调试,最大文件大小为1 MB,保留备份文件3个
        fh = RotatingFileHandler('metricbeat_replace.log', maxBytes=1e6, backupCount=3)
        fh.setLevel(logging.DEBUG)

        # 创建格式化器
        formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')

        # 将格式化器添加到处理程序
        ch.setFormatter(formatter)
        fh.setFormatter(formatter)

        # 将处理程序添加到记录器
        logger.addHandler(ch)
        logger.addHandler(fh)

        return logger

    def start_processing(self):
        self.connect_to_database()  # 初始化时第一次连接数据库
        self.load_cmdb_data()  # 初始化时加载数据到内存

        self.logger.info("开始处理...")

        consumer = KafkaConsumer(
            self.kafka_config['input_topic'],
            group_id=self.kafka_config['consumer_group_id'],
            bootstrap_servers=self.kafka_config['bootstrap_servers'],
            auto_offset_reset='earliest'
        )
        self.logger.info("Kafka 消费者已创建.")

        producer = KafkaProducer(bootstrap_servers=self.kafka_config['bootstrap_servers'])
        self.logger.info("Kafka 生产者已创建.")

        try:
            for msg in consumer:
                metricbeat_data = msg.value.decode('utf-8')
                ip = self.extract_ip(metricbeat_data)
                cmdb_data = self.get_cmdb_data(ip)
                self.process_and_send_message(producer, metricbeat_data, cmdb_data)

        except KeyboardInterrupt:
            self.logger.info("接收到 KeyboardInterrupt。正在优雅地关闭。")
        except Exception as e:
            self.logger.error(f"发生错误:{str(e)}")
            # 如果在处理过程中发生异常,可以在这里添加适当的处理逻辑
        finally:
            consumer.close()
            producer.close()

    def connect_to_database(self):
        try:
            self.logger.info("正在连接数据库...")
            db = pymysql.connect(
                host=self.mysql_config['host'],
                port=self.mysql_config['port'],
                user=self.mysql_config['user'],
                password=self.mysql_config['password'],
                database=self.mysql_config['db']
            )
            self.logger.info("数据库连接成功.")
            self.db_connection_error_logged = False  # 连接成功后重置连接错误标志
        except pymysql.Error as e:
            error_message = f"连接数据库时发生错误:{str(e)}"
            self.logger.error(error_message.split('\n')[0])
            raise DatabaseConnectionError(error_message) from e
        finally:
            if db:
                db.close()

    def load_cmdb_data(self):
        db = None
        cursor = None
        try:
            self.logger.info("开始加载数据.")
            db = pymysql.connect(
                host=self.mysql_config['host'],
                port=self.mysql_config['port'],
                user=self.mysql_config['user'],
                password=self.mysql_config['password'],
                database=self.mysql_config['db']
            )
            cursor = db.cursor()

            # 查询 cmdb_os 表中的数据
            sql_cmdb_os = "SELECT app_name, eip, module FROM cmdb_os"
            cursor.execute(sql_cmdb_os)
            cmdb_os_result = cursor.fetchall()

            # 查询 cmdb_app 表中的数据
            sql_cmdb_app = "SELECT app_name, ops_user, ops_tel, ops_dep FROM cmdb_app"
            cursor.execute(sql_cmdb_app)
            cmdb_app_result = cursor.fetchall()

            # 将数据保存到内存中
            self.cmdb_data = {
                "cmdb_os": cmdb_os_result,
                "cmdb_app": cmdb_app_result
            }

            self.logger.info("数据加载完成.")

        except pymysql.Error as e:
            error_message = f"加载数据时发生数据库错误:{str(e)}"
            self.logger.error(error_message.split('\n')[0])
            self.logger.warning("数据库连接异常,继续沿用上次的 CMDB 数据.")
        finally:
            if cursor:
                cursor.close()
            if db:
                db.close()

    @staticmethod
    def extract_ip(metricbeat_data):
        data = json.loads(metricbeat_data)
        return data.get('fields', {}).get('ip', '')

    def get_cmdb_data(self, ip):
        if self.cmdb_data:
            # 在内存中查找数据
            cmdb_os_data = [row for row in self.cmdb_data["cmdb_os"] if row[1] == ip]
            cmdb_app_data = [row for row in self.cmdb_data["cmdb_app"] if row[0] == cmdb_os_data[0][0]]
            return cmdb_os_data, cmdb_app_data
        else:
            return None

    def process_and_send_message(self, producer, original_data, cmdb_data):
        original_data_str = original_data.decode('utf-8') if isinstance(original_data, bytes) else original_data

        new_message = json.loads(original_data_str)

        if cmdb_data:
            cmdb_os_data, cmdb_app_data = cmdb_data
            new_message["cmdb_data"] = {
                "app_name": cmdb_os_data[0][0],
                "eip": cmdb_os_data[0][1],
                "module": cmdb_os_data[0][2],
                "ops_user": cmdb_app_data[0][1],
                "ops_tel": cmdb_app_data[0][2],
                "ops_dep": cmdb_app_data[0][3]
            }
        else:
            new_message["cmdb_data"] = None

        producer.send(self.kafka_config['output_topic'], value=json.dumps(new_message, ensure_ascii=False).encode('utf-8'))
        producer.flush()

if __name__ == "__main__":
    try:
        with open('application.yml', 'r') as config_file:
            config_data = yaml.safe_load(config_file)

        kafka_config_data = config_data.get('kafka', {})
        mysql_config_data = config_data.get('mysql', {})

        processor = KafkaCMDBProcessor(kafka_config_data, mysql_config_data)
        processor.start_processing()

    except FileNotFoundError:
        print("错误:找不到配置文件 'application.yml'。")
    except Exception as e:
        print(f"发生意外错误:{str(e)}")

application.yml配置如下:

kafka:
  bootstrap_servers:
    - '11.0.1.11:9092'
    - '11.0.1.12:9092'
    - '11.0.1.13:9092'
  consumer_group_id: 'metricbeat_replace'
  input_topic: 'ELK-metricbeat'
  output_topic: 'ELK-system_metricbeat'

mysql:
  host: '11.0.1.11'
  port: 13306
  user: 'root'
  password: '123456'
  db: 'zll_python_test'

处理后的数据如下:

{"@timestamp": "2024-01-20T14:02:34.706Z", "@metadata": {"beat": "metricbeat", "type": "_doc", "version": "8.11.1"}, "host": {"name": "demo1"}, "agent": {"type": "metricbeat", "version": "8.11.1", "ephemeral_id": "979b3ab7-80af-4ab5-a552-3692165b7000", "id": "982d0bd1-d0d9-45b5-bc78-0a5f25911c12", "name": "demo1"}, "metricset": {"name": "memory", "period": 10000}, "event": {"module": "system", "duration": 120280, "dataset": "system.memory"}, "service": {"type": "system"}, "system": {"memory": {"used": {"pct": 0.2325, "bytes": 919130112}, "free": 3034763264, "cached": 529936384, "actual": {"used": {"pct": 0.1493, "bytes": 590319616}, "free": 3363573760}, "swap": {"total": 2147479552, "used": {"bytes": 0, "pct": 0}, "free": 2147479552}, "total": 3953893376}}, "fields": {"ip": "11.0.1.14"}, "ecs": {"version": "8.0.0"}, "cmdb_data": {"app_name": "应用系统", "eip": "11.0.1.14", "module": "demo1", "ops_user": "运维仔", "ops_tel": "12345678901", "ops_dep": "运维部"}}

heartbeat_replace.py如下:

import json
import logging
from logging.handlers import RotatingFileHandler
import pymysql
from kafka import KafkaConsumer, KafkaProducer
import yaml
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger

class DatabaseConnectionError(Exception):
    def __init__(self, message="数据库连接失败"):
        self.message = message
        super().__init__(self.message)

class KafkaCMDBProcessor:
    def __init__(self, kafka_config, mysql_config):
        self.kafka_config = kafka_config
        self.mysql_config = mysql_config
        self.logger = self.setup_logger()
        self.cmdb_data = None

        # 初始化调度器
        self.scheduler = BackgroundScheduler()
        self.scheduler.start()

        # 添加定时任务,每个整点、10分、20分、30分、40分、50分的时候执行
        self.scheduler.add_job(self.load_cmdb_data, CronTrigger(minute='0,10,20,30,40,50'))

    @staticmethod
    def setup_logger():
        logger = logging.getLogger(__name__)
        logger.setLevel(logging.DEBUG)

        # 创建控制台处理程序并设置级别为调试
        ch = logging.StreamHandler()
        ch.setLevel(logging.DEBUG)

        # 创建文件处理程序并设置级别为调试,最大文件大小为1 MB,保留备份文件3个
        fh = RotatingFileHandler('heartbeat_replace.log', maxBytes=1e6, backupCount=3)
        fh.setLevel(logging.DEBUG)

        # 创建格式化器
        formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')

        # 将格式化器添加到处理程序
        ch.setFormatter(formatter)
        fh.setFormatter(formatter)

        # 将处理程序添加到记录器
        logger.addHandler(ch)
        logger.addHandler(fh)

        return logger

    def start_processing(self):
        self.connect_to_database()  # 初始化时第一次连接数据库
        self.load_cmdb_data()  # 初始化时加载数据到内存

        self.logger.info("开始处理...")

        consumer = KafkaConsumer(
            self.kafka_config['input_topic'],
            group_id=self.kafka_config['consumer_group_id'],
            bootstrap_servers=self.kafka_config['bootstrap_servers'],
            auto_offset_reset='earliest'
        )
        self.logger.info("Kafka 消费者已创建.")

        producer = KafkaProducer(bootstrap_servers=self.kafka_config['bootstrap_servers'])
        self.logger.info("Kafka 生产者已创建.")

        try:
            for msg in consumer:
                heartbeat_data = msg.value.decode('utf-8')
                ip = self.extract_url_domain(heartbeat_data)
                cmdb_data = self.get_cmdb_data(ip)
                self.process_and_send_message(producer, heartbeat_data, cmdb_data)

        except KeyboardInterrupt:
            self.logger.info("接收到 KeyboardInterrupt。正在优雅地关闭。")
        except Exception as e:
            self.logger.error(f"发生错误:{str(e)}")
            # 如果在处理过程中发生异常,可以在这里添加适当的处理逻辑
        finally:
            consumer.close()
            producer.close()

    def connect_to_database(self):
        try:
            self.logger.info("正在连接数据库...")
            db = pymysql.connect(
                host=self.mysql_config['host'],
                port=self.mysql_config['port'],
                user=self.mysql_config['user'],
                password=self.mysql_config['password'],
                database=self.mysql_config['db']
            )
            self.logger.info("数据库连接成功.")
            self.db_connection_error_logged = False  # 连接成功后重置连接错误标志
        except pymysql.Error as e:
            error_message = f"连接数据库时发生错误:{str(e)}"
            self.logger.error(error_message.split('\n')[0])
            raise DatabaseConnectionError(error_message) from e
        finally:
            if db:
                db.close()

    def load_cmdb_data(self):
        db = None
        cursor = None
        try:
            self.logger.info("开始加载数据.")
            db = pymysql.connect(
                host=self.mysql_config['host'],
                port=self.mysql_config['port'],
                user=self.mysql_config['user'],
                password=self.mysql_config['password'],
                database=self.mysql_config['db']
            )
            cursor = db.cursor()

            # 查询 cmdb_os 表中的数据
            sql_cmdb_os = "SELECT app_name, eip, module FROM cmdb_os"
            cursor.execute(sql_cmdb_os)
            cmdb_os_result = cursor.fetchall()

            # 查询 cmdb_app 表中的数据
            sql_cmdb_app = "SELECT app_name, ops_user, ops_tel, ops_dep FROM cmdb_app"
            cursor.execute(sql_cmdb_app)
            cmdb_app_result = cursor.fetchall()

            # 将数据保存到内存中
            self.cmdb_data = {
                "cmdb_os": cmdb_os_result,
                "cmdb_app": cmdb_app_result
            }

            self.logger.info("数据加载完成.")

        except pymysql.Error as e:
            error_message = f"加载数据时发生数据库错误:{str(e)}"
            self.logger.error(error_message.split('\n')[0])
            self.logger.warning("数据库连接异常,继续沿用上次的 CMDB 数据.")
        finally:
            if cursor:
                cursor.close()
            if db:
                db.close()

    @staticmethod
    def extract_url_domain(heartbeat_data):
        data = json.loads(heartbeat_data)
        return data.get('url', {}).get('domain', '')

    def get_cmdb_data(self, ip):
        if self.cmdb_data:
            # 在内存中查找数据
            cmdb_os_data = [row for row in self.cmdb_data["cmdb_os"] if row[1] == ip]
            cmdb_app_data = [row for row in self.cmdb_data["cmdb_app"] if row[0] == cmdb_os_data[0][0]]
            return cmdb_os_data, cmdb_app_data
        else:
            return None

    def process_and_send_message(self, producer, original_data, cmdb_data):
        original_data_str = original_data.decode('utf-8') if isinstance(original_data, bytes) else original_data

        new_message = json.loads(original_data_str)

        if cmdb_data:
            cmdb_os_data, cmdb_app_data = cmdb_data
            new_message["cmdb_data"] = {
                "app_name": cmdb_os_data[0][0],
                "eip": cmdb_os_data[0][1],
                "module": cmdb_os_data[0][2],
                "ops_user": cmdb_app_data[0][1],
                "ops_tel": cmdb_app_data[0][2],
                "ops_dep": cmdb_app_data[0][3]
            }
        else:
            new_message["cmdb_data"] = None

        producer.send(self.kafka_config['output_topic'], value=json.dumps(new_message, ensure_ascii=False).encode('utf-8'))
        producer.flush()

if __name__ == "__main__":
    try:
        with open('application.yml', 'r') as config_file:
            config_data = yaml.safe_load(config_file)

        kafka_config_data = config_data.get('kafka', {})
        mysql_config_data = config_data.get('mysql', {})

        processor = KafkaCMDBProcessor(kafka_config_data, mysql_config_data)
        processor.start_processing()

    except FileNotFoundError:
        print("错误:找不到配置文件 'application.yml'。")
    except Exception as e:
        print(f"发生意外错误:{str(e)}")

application.yml配置如下:

kafka:
  bootstrap_servers:
    - '11.0.1.11:9092'
    - '11.0.1.12:9092'
    - '11.0.1.13:9092'
  consumer_group_id: 'heartbeat_replace'
  input_topic: 'ELK-heartbeat'
  output_topic: 'ELK-system_heartbeat'

mysql:
  host: '11.0.1.11'
  port: 13306
  user: 'root'
  password: '123456'
  db: 'zll_python_test'

处理后的数据如下:

{"@timestamp": "2024-01-20T14:03:37.102Z", "@metadata": {"beat": "heartbeat", "type": "_doc", "version": "8.11.1"}, "monitor": {"name": "My ICMP Monitor", "type": "icmp", "id": "my-icmp-monitor", "status": "up", "check_group": "b4caac6d-b79c-11ee-bf86-000c29a1adec-1", "duration": {"us": 131}, "ip": "11.0.1.14", "timespan": {"gte": "2024-01-20T14:03:37.102Z", "lt": "2024-01-20T14:03:53.102Z"}}, "url": {"domain": "11.0.1.14", "full": "icmp://11.0.1.14", "scheme": "icmp"}, "fields": {"nodename": "demo1"}, "summary": {"retry_group": "b4caac6d-b79c-11ee-bf86-000c29a1adec", "attempt": 1, "max_attempts": 1, "final_attempt": true, "up": 1, "down": 0, "status": "up"}, "state": {"id": "default-18d1d73022a-0", "up": 32661, "down": 0, "ends": null, "started_at": "2024-01-19T00:41:32.970059265+08:00", "duration_ms": "163324132", "status": "up", "checks": 32661, "flap_history": []}, "event": {"type": "heartbeat/summary", "dataset": "icmp"}, "icmp": {"requests": 1, "rtt": {"us": 83}}, "ecs": {"version": "8.0.0"}, "agent": {"name": "demo1", "type": "heartbeat", "version": "8.11.1", "ephemeral_id": "46819a45-3552-4e57-91f3-e58ffb12c72a", "id": "d56462aa-6f6b-4237-8bfc-a93c7bf933f4"}, "cmdb_data": {"app_name": "应用系统", "eip": "11.0.1.14", "module": "demo1", "ops_user": "运维仔", "ops_tel": "12345678901", "ops_dep": "运维部"}}

总的来说,metricbeat_heartbeat和heartbeat_replace代码基本一致,只是个别地方heartbeat换成metricbeat,return data.get(‘fields’, {}).get(‘ip’, ‘’)和return data.get(‘url’, {}).get(‘domain’, ‘’)的差别而已

五、heartbeat告警
heartbeat_alarm.py如下:

# heartbeat_alarm.py

import json
import logging
import mysql.connector
from collections import defaultdict
from datetime import datetime, timedelta
from kafka import KafkaConsumer
import yaml

# 配置日志记录器
logging.basicConfig(
    level=logging.INFO,
    filename='heartbeat_checker.log',
    format='%(asctime)s [%(levelname)s] - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)


class HeartbeatChecker:
    def __init__(self, config_path='application.yml'):
        # 初始化 HeartbeatChecker 对象
        self.config_path = config_path
        self.kafka_bootstrap_servers = None
        self.kafka_group_id = None
        self.kafka_topic = None
        self.mysql_host = None
        self.mysql_port = None
        self.mysql_user = None
        self.mysql_password = None
        self.mysql_database = None
        self.consecutive_down_threshold = None
        self.consecutive_up_threshold = None

        # 从 YAML 文件加载配置
        self.load_config()
        self.kafka_consumer = None

    def load_config(self):
        try:
            # 从 YAML 文件加载配置
            with open(self.config_path, 'r') as file:
                config = yaml.safe_load(file)

            # 提取 Kafka 配置
            self.kafka_bootstrap_servers = config['kafka']['bootstrap_servers']
            self.kafka_group_id = config['kafka']['group_id']
            self.kafka_topic = config['kafka']['topic']

            # 提取 MySQL 配置
            self.mysql_host = config['mysql']['host']
            self.mysql_port = config['mysql']['port']
            self.mysql_user = config['mysql']['user']
            self.mysql_password = config['mysql']['password']
            self.mysql_database = config['mysql']['database']

            # 提取连续 down 和连续 up 的阈值
            self.consecutive_down_threshold = config['thresholds']['consecutive_down']
            self.consecutive_up_threshold = config['thresholds']['consecutive_up']

        except Exception as e:
            # 处理配置加载错误
            logger.error(f"加载配置时发生错误: {e}")
            raise

    def create_kafka_consumer(self):
        try:
            # 创建 Kafka Consumer 实例
            self.kafka_consumer = KafkaConsumer(
                self.kafka_topic,
                bootstrap_servers=self.kafka_bootstrap_servers,
                group_id=self.kafka_group_id,
                auto_offset_reset='latest',
                enable_auto_commit=True,
                value_deserializer=lambda x: json.loads(x.decode('utf-8'))
            )
        except Exception as e:
            # 处理创建 Kafka Consumer 错误
            logger.error(f"创建 Kafka Consumer 时发生错误: {e}")
            raise

    def check_heartbeat_alerts(self):
        # 初始化 defaultdict 以存储每个 URL 的监测状态列表
        url_groups = defaultdict(list)
        mysql_connection = None

        try:
            # 创建 Kafka Consumer 并连接到 MySQL 数据库
            self.create_kafka_consumer()
            mysql_connection = mysql.connector.connect(
                host=self.mysql_host,
                port=self.mysql_port,
                user=self.mysql_user,
                password=self.mysql_password,
                database=self.mysql_database
            )
            mysql_cursor = mysql_connection.cursor()

            # 遍历 Kafka 消息
            for message in self.kafka_consumer:
                json_data = message.value
                url = json_data.get('url', {}).get('full')
                monitor_status = json_data.get('monitor', {}).get('status')
                timestamp_str = json_data.get('@timestamp')
                cmdb_data = json_data.get('cmdb_data')

                if url and monitor_status and timestamp_str:
                    timestamp = self.convert_to_local_time(timestamp_str)

                    # 处理连续 up 的情况
                    if monitor_status == 'up' and self.url_exists_down_in_mysql(mysql_cursor, url):
                        url_groups[url].append(monitor_status)
                        mysql_cursor.fetchall()

                        if len(url_groups[url]) >= self.consecutive_up_threshold and all(
                                status == 'up' for status in url_groups[url][-self.consecutive_up_threshold:]):
                            self.delete_from_mysql(mysql_cursor, url, mysql_connection)
                            logger.info(f"URL: {url} - 由于连续 up 被从 MySQL 中移除")
                    else:
                        # 处理连续 down 的情况
                        if monitor_status == 'down' and not self.url_exists_down_in_mysql(mysql_cursor, url):
                            url_groups[url].append(monitor_status)
                            mysql_cursor.fetchall()

                            if len(url_groups[url]) >= self.consecutive_down_threshold and all(
                                    status == 'down' for status in url_groups[url][-self.consecutive_down_threshold:]):
                                self.send_alert(url)
                                self.write_to_mysql(mysql_cursor, timestamp, url, monitor_status, mysql_connection, cmdb_data)
                                url_groups[url] = []
                                logger.info(f"URL: {url} - 被添加到 MySQL 中")
                        elif monitor_status == 'up' and self.url_exists_down_in_mysql(mysql_cursor, url):
                            url_groups[url].append(monitor_status)
                            mysql_cursor.fetchall()

                            if len(url_groups[url]) >= self.consecutive_up_threshold and all(
                                    status == 'up' for status in url_groups[url][-self.consecutive_up_threshold:]):
                                self.delete_from_mysql(mysql_cursor, url, mysql_connection)
                                url_groups[url] = []
                                logger.info(f"URL: {url} - 由于连续 up 被从 MySQL 中移除")

        except Exception as e:
            # 处理运行时错误
            logger.error(f"发生错误: {e}")

        finally:
            # 关闭 Kafka Consumer 和 MySQL 连接
            if self.kafka_consumer:
                self.kafka_consumer.close()

            if mysql_connection:
                mysql_connection.close()

    def send_alert(self, url):
        # 记录告警信息
        logger.info(f"告警: URL {url} 连续 {self.consecutive_down_threshold} 次掉线")

    @staticmethod
    def write_to_mysql(cursor, timestamp, url, status, connection, cmdb_data=None):
        try:
            # 插入数据到 MySQL,包括 "cmdb_data" 字段
            insert_query = """
                INSERT INTO alert_list (timestamp, url, status, app_name, module, ops_user, ops_tel, ops_dep)
                VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
            """
            cursor.execute(insert_query, (
                timestamp,
                url,
                status,
                cmdb_data.get('app_name', '') if cmdb_data else '',
                cmdb_data.get('module', '') if cmdb_data else '',
                cmdb_data.get('ops_user', '') if cmdb_data else '',
                cmdb_data.get('ops_tel', '') if cmdb_data else '',
                cmdb_data.get('ops_dep', '') if cmdb_data else ''
            ) if cmdb_data else (timestamp, url, status, '', '', '', '', ''))
            connection.commit()
            logging.info(f"Inserted into MySQL: URL {url}, Status {status}, cmdb_data {cmdb_data}")
        except Exception as e:
            # 处理写入 MySQL 错误
            logger.error(f"Error writing to MySQL: {e}")

    @staticmethod
    def delete_from_mysql(cursor, url, connection):
        try:
            # 从 MySQL 删除数据
            delete_query = "DELETE FROM alert_list WHERE url = %s AND status = 'down'"
            cursor.execute(delete_query, (url,))
            connection.commit()
            logging.info(f"从 MySQL 中删除: URL {url}")
        except Exception as e:
            # 处理从 MySQL 删除错误
            logger.error(f"从 MySQL 中删除时发生错误: {e}")

    @staticmethod
    def url_exists_down_in_mysql(cursor, url):
        try:
            # 检查 URL 是否存在于 MySQL 中
            query = "SELECT * FROM alert_list WHERE url = %s AND status = 'down'"
            cursor.execute(query, (url,))
            return bool(cursor.fetchone())
        except Exception as e:
            # 处理检查 URL 存在性错误
            logger.error(f"检查 URL 是否存在于 MySQL 中时发生错误: {e}")
            return False

    @staticmethod
    def convert_to_local_time(timestamp_str):
        # 将 UTC 时间转换为本地时间
        timestamp_utc = datetime.strptime(timestamp_str, "%Y-%m-%dT%H:%M:%S.%fZ")
        timestamp_local = timestamp_utc + timedelta(hours=8)
        return timestamp_local.strftime("%Y-%m-%d %H:%M:%S")


if __name__ == "__main__":
    try:
        # 运行主程序
        heartbeat_checker = HeartbeatChecker()
        heartbeat_checker.check_heartbeat_alerts()
    except KeyboardInterrupt:
        print("退出...")

appllication.yml如下:

# application.yml

kafka:
  bootstrap_servers:
    - '11.0.1.11:9092'
    - '11.0.1.12:9092'
    - '11.0.1.13:9092'
  group_id: 'python_alert'
  topic: 'ELK-system_heartbeat'

mysql:
  host: '11.0.1.11'
  port: 13306
  user: 'root'
  password: '123456'
  database: 'zll_python_test'

thresholds:
  consecutive_down: 1
  consecutive_up: 1

其中consecutive_down表示连续down几次触发告警,consecutive_up表示连续up几次告警恢复。

六、metricbeat告警

metricbeat可以配置的告警比较多,比如CPU、内存、文件系统等,Python代码如下:

import logging
from logging.handlers import RotatingFileHandler
from kafka import KafkaConsumer, KafkaProducer
import yaml
import json


class KafkaAlertProcessor:
    def __init__(self, config_path):
        # 配置记录日志到控制台和文件
        self.configure_logging()

        with open(config_path, 'r', encoding='utf-8') as config_file:
            config = yaml.safe_load(config_file)

        self.kafka_brokers = config['kafka']['brokers']
        self.input_topic = config['kafka']['input_topic']
        self.output_topic = config['kafka']['output_topic']
        self.group_id = config['kafka']['group_id']
        self.cpu_alert_threshold = config['alert_thresholds']['cpu']
        self.memory_alert_threshold = config['alert_thresholds']['memory']
        self.filesystem_alert_threshold = config['alert_thresholds']['filesystem']
        self.common_template = config['alert_templates']['common']
        self.cpu_alert_template = config['alert_templates']['cpu']
        self.memory_alert_template = config['alert_templates']['memory']
        self.filesystem_alert_template = config['alert_templates']['filesystem']

        self.consumer = None
        self.producer = None

    @staticmethod
    def configure_logging():
        # 配置日志记录到控制台和文件
        logger = logging.getLogger('')
        logger.setLevel(logging.INFO)

        # 配置控制台输出
        console_handler = logging.StreamHandler()
        console_handler.setLevel(logging.INFO)

        # 配置文件输出,按文件大小进行轮转,最多保存10个日志文件,每个文件最大1M
        file_handler = RotatingFileHandler('metricbeat_alarm.log', maxBytes=1000000, backupCount=10,
                                           delay=False)
        file_handler.setLevel(logging.INFO)

        # 设置日志格式
        formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
        console_handler.setFormatter(formatter)
        file_handler.setFormatter(formatter)

        # 将处理程序添加到日志记录器
        logger.addHandler(console_handler)
        logger.addHandler(file_handler)

    def initialize_consumer(self):
        self.consumer = KafkaConsumer(
            self.input_topic,
            group_id=self.group_id,
            bootstrap_servers=','.join(self.kafka_brokers),
            auto_offset_reset='latest',
            enable_auto_commit=False,
        )

    def initialize_producer(self):
        self.producer = KafkaProducer(
            bootstrap_servers=','.join(self.kafka_brokers),
            value_serializer=lambda v: json.dumps(v, ensure_ascii=False).encode('utf-8')
        )

    def process_alert(self, data):
        cmdb_data = data.get("cmdb_data", {})
        common_message = self.common_template.format(
            cmdb_data_eip=cmdb_data.get("eip", ""),
            cmdb_data_app_name=cmdb_data.get("app_name", ""),
            cmdb_data_module=cmdb_data.get("module", ""),
            cmdb_data_ops_user=cmdb_data.get("ops_user", ""),
            cmdb_data_ops_tel=cmdb_data.get("ops_tel", ""),
            cmdb_data_ops_dep=cmdb_data.get("ops_dep", "")

        )
        # 检查 CPU 使用率
        if "system" in data and "cpu" in data["system"] and "total" in data["system"]["cpu"] and "pct" in \
                data["system"]["cpu"]["total"]:
            cpu_usage = data["system"]["cpu"]["total"]["pct"] / data["system"]["cpu"]["cores"]
            if cpu_usage > self.cpu_alert_threshold:
                cpu_alert_message = self.cpu_alert_template.format(
                    cpu_usage=data["system"]["cpu"]["total"]["pct"] / data["system"]["cpu"]["cores"]
                )
                alert_message = cpu_alert_message + common_message
                self.send_alert("CPU 告警", alert_message)

        # 检查内存使用率
        if "system" in data and "memory" in data["system"] and "actual" in data["system"]["memory"] and "used" in \
                data["system"]["memory"]['actual'] and 'pct' in data["system"]["memory"]['actual']['used']:
            memory_usage = data["system"]["memory"]["actual"]["used"]["pct"]
            if memory_usage > self.memory_alert_threshold:
                memory_alert_message = self.memory_alert_template.format(
                    memory_actual_used_pct=memory_usage
                )
                alert_message = memory_alert_message + common_message
                self.send_alert("内存告警", alert_message)

        # 检查文件系统使用率
        if "system" in data and "filesystem" in data["system"] and "used" in data["system"]["filesystem"] and "pct" in \
                data["system"]["filesystem"]["used"]:
            filesystem_usage = data["system"]["filesystem"]["used"]["pct"]
            if filesystem_usage > self.filesystem_alert_threshold:
                fs_alert_message = self.filesystem_alert_template.format(
                    filesystem_used_pct=filesystem_usage,
                    filesystem_mount_point=data["system"]["filesystem"].get("mount_point", "Unknown")
                )
                alert_message = fs_alert_message + common_message
                self.send_alert("文件系统告警", alert_message)

    def send_alert(self, alert_type, alert_message):
        formatted_message = f"{alert_type} - {alert_message}"
        logging.warning(formatted_message)
        self.producer.send(self.output_topic,
                           value={"alert_type": alert_type, "alert_message": formatted_message})

    def run(self):
        self.initialize_consumer()
        self.initialize_producer()

        try:
            for msg in self.consumer:
                try:
                    data = json.loads(msg.value.decode('utf-8'))
                    self.process_alert(data)
                except json.JSONDecodeError as e:
                    logging.error(f"解码 JSON 错误: {e}")

        except KeyboardInterrupt:
            pass
        finally:
            if self.consumer:
                self.consumer.close()


if __name__ == "__main__":
    kafka_alert_processor = KafkaAlertProcessor(config_path="application.yml")
    kafka_alert_processor.run()

application.yml文件如下:

kafka:
  brokers:
    - "11.0.1.11:9092"
    - "11.0.1.12:9092"
    - "11.0.1.13:9092"
  input_topic: "ELK-system_metricbeat"
  output_topic: "ELK-alarm"
  group_id: "ELK-alarm"

alert_thresholds:
  cpu: 0.01
  memory: 0.1
  filesystem: 0.1

alert_templates:
  common: "IP:{cmdb_data_eip}\n系统名称:{cmdb_data_app_name}\n模块:{cmdb_data_module}\n运维责任人:{cmdb_data_ops_user}\n电话:{cmdb_data_ops_tel}\n责任部门:{cmdb_data_ops_dep}\n"
  cpu: "【CPU使用率告警】\n告警信息:CPU 使用率 超过阈值。当前平均值:{cpu_usage}.\n"
  memory: "【内存使用率告警】\n告警信息:内存 使用率 超过阈值。当前值:{memory_actual_used_pct}.\n"
  filesystem: "【文件系统使用率告警】\n告警信息:文件系统 使用率 超过阈值。当前值:{filesystem_used_pct}. 挂载点:{filesystem_mount_point}.\n"

产生的告警信息如下:

【内存使用率告警】
告警信息:内存 使用率 超过阈值。当前值:0.1502.
IP:11.0.1.14
系统名称:应用系统
模块:demo1
运维责任人:运维仔
电话:12345678901
责任部门:运维部

【文件系统使用率告警】
告警信息:文件系统 使用率 超过阈值。当前值:0.1178. 挂载点:/.
IP:11.0.1.14
系统名称:应用系统
模块:demo1
运维责任人:运维仔
电话:12345678901
责任部门:运维部

【CPU使用率告警】
告警信息:CPU 使用率 超过阈值。当前平均值:0.002.
IP:11.0.1.14
系统名称:应用系统
模块:demo1
运维责任人:运维仔
电话:12345678901
责任部门:运维部

我目前是将告警信息输出到kafka、控制台和日志的,各位看官可以根据自己的需要,将信息写入到接口、redis或者数据库中。

七、告警恢复
正在研究中文章来源地址https://www.toymoban.com/news/detail-808279.html

到了这里,关于使用Python基于metricbeat和heartbeat采集数据进行告警的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 用Python采集电商平台商品数据进行可视化分析

    前言 嗨喽~大家好呀,这里是魔王呐 ❤ ~! 环境使用: python 3.8 解释器 pycharm 编辑器 模块使用: 第三方模块 需要安装 requests — 发送 HTTP请求 内置模块 不需要安装 csv — 数据处理中经常会用到的一种文件格式 第三方模块安装: win + R 输入cmd 输入安装命令 pip install 模块名 (如果你

    2024年02月17日
    浏览(43)
  • Python爬虫:批量采集58同城数据,进行可视化分析!

    哈喽大家好,今天我们来获取一下某个生活平台网站数据,进行可视化分析。 采集58的数据可以使用Python的requests库和beautifulsoup库,数据可视化分析可以使用matplotlib库和seaborn库。下面是一个简单的例子: 1、首先导入需要使用的模块   2、设置请求头,模拟浏览器请求。  

    2024年02月06日
    浏览(36)
  • 使用FPGA控制AD7768进行数据采集

    数据采集是许多嵌入式系统和信号处理应用中的重要任务。AD7768是一款高性能、低功耗的模数转换器(ADC),它具有8个模拟输入通道和24位分辨率。为了实现对AD7768的控制和数据采集,我们可以使用FPGA(现场可编程门阵列)作为控制器。本文将介绍如何使用FPGA来控制AD7768进

    2024年02月07日
    浏览(31)
  • 使用iOS应用程序进行数据采集:从入门到实践

    随着移动互联网的普及,越来越多的数据产生于移动设备。为了更好地了解用户行为、优化产品体验,我们需要在iOS应用程序中进行数据采集。本文将指导您如何在iOS应用中实现数据采集,从基本概念到实际操作。 数据采集的基本概念与方法 a. 数据采集的目的 数据采集的主

    2024年02月10日
    浏览(37)
  • 【NI-RIO入门】使用LabVIEW进行数据采集测量

    于ni kb摘录         CompactRIO系统具有至少两个用户可选模式。某些CompactRIO型号具有附加的用户可选模式,可以在实时NI-DAQmx中进行编程。请参考本文以判断您的CompactRIO是否能够使用实时NI-DAQmx。将目标添加到项目后,将提示您选择要使用的编程模式。 注意 :如果需要,您

    2024年02月03日
    浏览(28)
  • 基于Python的网络爬虫电商数据采集系统设计与实现

     博主介绍 :黄菊华老师《Vue.js入门与商城开发实战》《微信小程序商城开发》图书作者,CSDN博客专家,在线教育专家,CSDN钻石讲师;专注大学生毕业设计教育和辅导。 所有项目都配有从入门到精通的基础知识视频课程,免费 项目配有对应开发文档、开题报告、任务书、

    2024年02月04日
    浏览(34)
  • 基于python商品数据采集分析可视化系统 淘宝数据采集 大数据 大屏可视化(附源码+论文)大数据毕业设计✅

    毕业设计:2023-2024年计算机专业毕业设计选题汇总(建议收藏) 毕业设计:2023-2024年最新最全计算机专业毕设选题推荐汇总 🍅 感兴趣的可以先收藏起来,点赞、关注不迷路,大家在毕设选题,项目以及论文编写等相关问题都可以给我留言咨询,希望帮助同学们顺利毕业 。

    2024年02月02日
    浏览(47)
  • 大数据毕业设计:基于python商品数据采集分析可视化系统 淘宝数据采集 大数据 大屏可视化(附源码+论文)✅

    博主介绍:✌全网粉丝10W+,前互联网大厂软件研发、集结硕博英豪成立工作室。专注于计算机相关专业毕业设计项目实战6年之久,选择我们就是选择放心、选择安心毕业✌感兴趣的可以先收藏起来,点赞、关注不迷路✌ 毕业设计:2023-2024年计算机专业毕业设计选题汇总(建议

    2024年02月03日
    浏览(43)
  • 基于python重庆招聘数据爬虫采集系统设计与实现(django框架)

     博主介绍 :黄菊华老师《Vue.js入门与商城开发实战》《微信小程序商城开发》图书作者,CSDN博客专家,在线教育专家,CSDN钻石讲师;专注大学生毕业设计教育和辅导。 所有项目都配有从入门到精通的基础知识视频课程,免费 项目配有对应开发文档、开题报告、任务书、

    2024年01月23日
    浏览(38)
  • 基于python玩具销售数据爬虫采集系统设计与实现(django框架)

     博主介绍 :黄菊华老师《Vue.js入门与商城开发实战》《微信小程序商城开发》图书作者,CSDN博客专家,在线教育专家,CSDN钻石讲师;专注大学生毕业设计教育和辅导。 所有项目都配有从入门到精通的基础知识视频课程,免费 项目配有对应开发文档、开题报告、任务书、

    2024年02月05日
    浏览(29)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包