智能运维实战:5步打通Prometheus告警→DeepSeek解析→微信通知,效率飙升90%
一、引言
在现代的监控系统中,Prometheus 作为一款开源的监控和告警工具,被广泛应用于各种场景。当系统出现异常时,Prometheus 能够及时触发告警,帮助运维人员快速发现问题。然而,告警信息往往只是简单地告知问题的发生,对于问题的具体原因和处理建议,运维人员还需要花费时间去分析。为了提高运维效率,我们可以借助大语言模型 DeepSeek 对 Prometheus 告警进行自动解析,并将解析结果和处理建议发送到微信,让运维人员能够第一时间了解问题的全貌和解决方案。
二、整体架构
整个系统的架构主要由以下几个部分组成:
- Prometheus:负责监控系统的各项指标,当指标超过预设的阈值时,触发告警。
- Alertmanager:接收 Prometheus 发送的告警信息,并根据配置的规则进行分组、抑制、路由等处理,然后将告警信息发送到指定的 Webhook 地址。
- Flask 应用:作为 Webhook 服务,接收 Alertmanager 发送的告警信息,调用 DeepSeek API 对告警进行解析,并将解析结果和处理建议发送到企业微信。
- DeepSeek API:利用大语言模型的能力,对 Prometheus 告警信息进行解析,提供问题的分析和处理建议。
- 企业微信:作为消息接收端,接收 Flask 应用发送的告警解析和处理建议,方便运维人员及时查看。
三、详细实现步骤
默认已部署好Prometheus+alertmanager环境,并配置好告警消息发送到企业微信。接下来需要写一个webhook程序,alertmanager发送告警时发送给企业微信并发送给webhook程序,webhook程序接收到告警后将告警发送给deepseek解析,再将deepseek的回答发送到企业微信。
3.1 编写webhook代码
import logging
from flask import Flask, request, jsonify
import requests
import json
from openai import OpenAI
logging.basicConfig(
filename='/opt/prometheus/alertmanager/wechat-with-deepseek.log',
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
app = Flask(__name__)
WECHAT_CORP_ID = 'xxxxxxx'
WECHAT_TO_PARTY = 'x'
WECHAT_AGENT_ID = 'xxxxxx'
WECHAT_API_SECRET = 'xxxxxxxxxxxxxxxxxxxxx'
def call_deepseek(alert):
logging.info("开始调用 DeepSeek API 解析告警信息")
try:
DEEPSEEK_API_KEY = xxxxxxxxxxxxxxxxxxxxxxxxxxx
DEEPSEEK_BASE_URL = "https://api.lkeap.cloud.tencent.com/v1"
client = OpenAI(
api_key=DEEPSEEK_API_KEY,
base_url=DEEPSEEK_BASE_URL
)
message_content = "请解析这个 Prometheus 告警,并给出处理建议:%s,请尽量精简表述。" % json.dumps(alert)
chat_completion = client.chat.completions.create(
model="deepseek-r1",
messages=[
{
"role": "user",
"content": message_content
}
]
)
logging.info("DeepSeek API 调用成功,获取到解析结果")
return chat_completion.choices[0].message.content
except Exception as e:
logging.error(f"DeepSeek API 调用出错: {str(e)}")
return None
def get_wechat_access_token():
logging.info("开始获取企业微信访问令牌")
url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=%s&corpsecret=%s" % (WECHAT_CORP_ID, WECHAT_API_SECRET)
response = requests.get(url)
result = response.json()
if result.get('errcode') == 0:
logging.info("成功获取企业微信访问令牌")
return result.get('access_token')
else:
logging.error(f"获取企业微信访问令牌失败: {result.get('errmsg')}")
return None
def send_wechat_message(message):
logging.info("开始发送企业微信消息")
access_token = get_wechat_access_token()
if access_token:
url = "https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=%s" % access_token
headers = {"Content-Type": "application/json"}
response = requests.post(url, headers=headers, data=json.dumps(message))
result = response.json()
if result.get('errcode') == 0:
logging.info("企业微信消息发送成功")
else:
logging.error(f"企业微信消息发送失败: {result.get('errmsg')}")
return result
return None
@app.route('/handle_alert', methods=['POST'])
def handle_alert():
try:
logging.info("接收到 Alertmanager 发送的告警信息")
alert_data = request.get_json()
# 判断告警内容开头,若为 [RESOLVED] 则不处理
if 'alerts' in alert_data and alert_data['alerts']:
first_alert = alert_data['alerts'][0]
if 'annotations' in first_alert and'summary' in first_alert['annotations']:
summary = first_alert['annotations']['summary']
if summary.startswith('[RESOLVED]'):
logging.info("接收到告警恢复通知,不做处理")
return jsonify({"status": "ignored", "message": "告警恢复通知,不做处理"})
analysis = call_deepseek(alert_data)
if analysis:
message_content = f"deepseek 告警解析与处理建议:
{analysis}"
wechat_message = {
"touser": "@all",
"toparty": WECHAT_TO_PARTY,
"msgtype": "text",
"agentid": WECHAT_AGENT_ID,
"text": {
"content": message_content
},
"safe": 0
}
send_result = send_wechat_message(wechat_message)
if send_result and send_result.get("errcode") == 0:
logging.info("告警信息处理并发送到企业微信成功")
return jsonify({"status": "success"})
else:
logging.error("告警信息处理并发送到企业微信失败")
return jsonify({"status": "failed", "message": send_result})
logging.error("DeepSeek 解析失败,无法发送消息到企业微信")
return jsonify({"status": "failed", "message": "DeepSeek 解析失败"})
except Exception as e:
logging.error(f"处理告警信息时出现异常: {str(e)}")
return jsonify({"status": "failed", "message": str(e)})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
3.2 代码解释
call_deepseek
函数:该函数接收 Prometheus 告警信息的 JSON 数据,首先从环境变量中获取 DeepSeek API 密钥,然后调用 DeepSeek API 进行解析。它将告警信息组织成特定的格式发送给 DeepSeek,获取解析结果后返回。如果 API 调用出错,则记录错误日志并返回None
。get_wechat_access_token
函数:通过企业微信提供的接口,使用企业 ID 和应用密钥获取访问令牌。如果获取成功,记录日志并返回令牌;否则,记录错误日志并返回None
。send_wechat_message
函数:接收要发送的消息内容,先获取企业微信访问令牌,然后将消息发送到企业微信。根据发送结果记录相应的日志,并返回发送结果。handle_alert
函数:这是 Flask 应用的核心函数,负责接收 Alertmanager 发送的告警信息。首先记录接收到告警的日志,然后获取 JSON 格式的告警数据。接着,判断告警是否为恢复通知(以[RESOLVED]
开头),若是则不做处理并返回相应信息。否则,调用call_deepseek
函数进行解析,根据解析结果构建企业微信消息并发送。最后,根据消息发送结果返回相应的状态信息。
3.3 运行webhook程序
nohup python3.9 /opt/prometheus/alertmanager/wechat-with-deepseek.py > /opt/prometheus/alertmanager/wechat-with-deepseek.log 2>&1 &
ps -aux | grep deepseek
3.4 alertmanager配置
receiver: 'wechat'
routes:
- match_re:
job: .*
receiver: 'wechat'
repeat_interval: 1h
continue: true
- match_re:
job: .*
receiver: 'wechat-with-deepseek'
repeat_interval: 1h
receivers:
- name: 'wechat-with-deepseek'
webhook_configs:
- url: 'http://127.0.0.1:5000/handle_alert'
send_resolved: false