基于Nuclei的自动化漏洞监测告警平台
一、项目背景
在SRC漏洞挖掘和日常渗透测试中,需要持续监控目标资产的安全状态。手动逐个扫描效率低,且容易遗漏。本项目基于Nuclei漏洞扫描引擎,结合企业微信机器人Webhook,实现自动化扫描+实时告警推送,发现高危漏洞立即通知,大幅提升漏洞发现效率。
同时搭建CVE-POC情报监控系统,自动抓取GitHub最新发布的漏洞利用代码,第一时间获取0day情报。
二、技术架构
三、环境准备
3.1 服务器环境
Ubuntu 22.04 / Debian 12
Python3 + requests库
Nuclei扫描引擎
# 安装Nuclei go install -v github.com/projectdiscovery/nuclei/v3/cmd/nuclei@latest nuclei -up # 更新模板 # 安装Python依赖 pip3 install requests3.2 企业微信机器人配置
进入企业微信群聊 → 右上角「···」→「群机器人」
添加机器人 → 自定义 → 名称填「漏洞监控」
在消息推送中添加
复制Webhook地址,格式如下:
https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxxxxxxx四、核心功能一:漏洞扫描告警
4.1 完整扫描脚本
#!/usr/bin/env python3 import requests import json import subprocess import sys import re from datetime import datetime WEBHOOK = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=你的Key" def send_wechat(title, content): """推送消息到企业微信""" data = { "msgtype": "markdown", "markdown": {"content": content} } try: resp = requests.post(WEBHOOK, json=data, timeout=10) return resp.json().get("errcode") == 0 except Exception as e: print(f"[-] 推送失败: {e}") return False def get_cve_link(name): """提取CVE编号并生成NVD链接""" cve_match = re.search(r'CVE-(\d{4})-(\d+)', name) if cve_match: return f"https://nvd.nist.gov/vuln/detail/CVE-{cve_match.group(1)}-{cve_match.group(2)}" return None def scan_single(target): """扫描单个目标""" print(f"\n[*] 扫描: {target}") cmd = [ "nuclei", "-u", target, "-s", "critical,high,medium", "-o", "/tmp/nuclei_result.json", "-jsonl", "-c", "50", "-timeout", "10" ] try: subprocess.run(cmd, capture_output=True, text=True, timeout=1800) except subprocess.TimeoutExpired: print("[-] 扫描超时") return [] alerts = [] try: with open("/tmp/nuclei_result.json", "r") as f: for line in f: if not line.strip(): continue data = json.loads(line) info = data.get("info", {}) severity = info.get("severity", "unknown") if severity not in ["critical", "high", "medium"]: continue name = info.get("name", "未知漏洞") host = data.get("host", target) matched = data.get("matched-at", "") template_id = data.get("template-id", "") description = info.get("description", "") reference = info.get("reference", []) emoji = "🔴" if severity == "critical" else "🟠" if severity == "high" else "🟡" cve_link = get_cve_link(name) alert = f"{emoji} **[{severity.upper()}]** {name}\n" alert += f"> 目标: `{host}`\n" if matched: alert += f"> 命中路径: `{matched}`\n" if template_id: alert += f"> 模板ID: `{template_id}`\n" if description: alert += f"> 描述: {description}\n" if cve_link: alert += f"> CVE详情: [点击查看]({cve_link})\n" if reference and len(reference) > 0: ref = reference[0] if isinstance(reference, list) else reference alert += f"> 参考: [漏洞详情]({ref})\n" alerts.append(alert) except Exception as e: print(f"[-] 解析出错: {e}") return alerts def push_results(targets, all_alerts): """汇总并推送扫描结果""" if all_alerts: content = f"## 🚨 漏洞告警 - 共 {len(all_alerts)} 个\n\n" content += "\n---\n\n".join(all_alerts) content += f"\n\n---\n\n**目标数:** {len(targets)}\n" content += f"**时间:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n" content += f"**引擎:** Nuclei" send_wechat("漏洞告警", content) print(f"\n[+] 发现 {len(all_alerts)} 个漏洞,已推送") else: content = f"## ✅ 扫描完成\n\n**目标数:** {len(targets)}\n" content += f"**结果:** 未发现漏洞\n" content += f"**时间:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" send_wechat("扫描完成", content) print("\n[+] 未发现漏洞") if __name__ == "__main__": if len(sys.argv) < 2: print("用法:") print(" 单目标: python3 scan_alert.py http://example.com") print(" 批量: python3 scan_alert.py -f targets.txt") sys.exit(1) arg = sys.argv[1] if arg == "-f": if len(sys.argv) < 3: print("[-] 请指定目标列表文件") sys.exit(1) with open(sys.argv[2], "r") as f: targets = [l.strip() for l in f if l.strip() and not l.startswith("#")] else: targets = [arg] print(f"[*] 共 {len(targets)} 个目标") all_alerts = [] for target in targets: all_alerts.extend(scan_single(target)) push_results(targets, all_alerts)4.2 目标列表格式(targets.txt)
# 每行一个目标,#开头为注释 http://127.0.0.1:8080 http://testphp.vulnweb.com https://target.example.com五、使用方法
5.1 单目标扫描
python3 scan_alert.py http://127.0.0.1:80805.2 批量扫描
python3 scan_alert.py -f targets.txt5.3 定时自动扫描(crontab)
# 每天凌晨2点自动扫描 0 2 * * * cd /home/HH/vuln-monitor && python3 scan_alert.py -f targets.txt >> /var/log/vuln_scan.log 2>&1六、核心功能二:CVE-POC情报监控
5.1 监控脚本
#!/usr/bin/env python3 import requests import json import re from datetime import datetime, timedelta # 企业微信Webhook地址(请替换为自己的) WEBHOOK = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=你的Key" def send_wechat(title, content): data = { "msgtype": "markdown", "markdown": {"content": content} } try: resp = requests.post(WEBHOOK, json=data, timeout=10) return resp.json().get("errcode") == 0 except Exception as e: print(f"[-] 推送失败: {e}") return False def get_recent_cve_poc(): """获取GitHub上最近24小时发布的CVE-POC""" yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d") urls = [ f"https://api.github.com/search/repositories?q=CVE+POC+created:>{yesterday}&sort=updated&order=desc", f"https://api.github.com/search/repositories?q=exploit+CVE+created:>{yesterday}&sort=updated&order=desc" ] results = [] for url in urls: try: resp = requests.get(url, timeout=15) if resp.status_code == 200: data = resp.json() for item in data.get("items", [])[:5]: repo = { "name": item.get("name", ""), "url": item.get("html_url", ""), "desc": item.get("description", "")[:100], "stars": item.get("stargazers_count", 0), "lang": item.get("language", "Unknown"), "updated": item.get("updated_at", "")[:10] } if not any(r["url"] == repo["url"] for r in results): results.append(repo) except Exception as e: print(f"[-] 请求失败: {e}") return results def check_cve_details(repo_name): """尝试提取CVE编号""" cve_match = re.search(r'CVE-(\d{4})-(\d+)', repo_name, re.I) if cve_match: return f"https://nvd.nist.gov/vuln/detail/CVE-{cve_match.group(1)}-{cve_match.group(2)}" return None def main(): print(f"[*] 开始监控CVE-POC: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") repos = get_recent_cve_poc() if not repos: print("[+] 未发现新的CVE-POC") return print(f"[+] 发现 {len(repos)} 个新POC") content = f"## 🔥 漏洞情报监控 - {len(repos)} 个新POC\n\n" for repo in repos: cve_link = check_cve_details(repo["name"]) content += f"### 📌 {repo['name']}\n" content += f"> ⭐ Stars: {repo['stars']} | 语言: {repo['lang']} | 更新: {repo['updated']}\n" if repo["desc"]: content += f"> 描述: {repo['desc']}\n" content += f"> GitHub: [点击查看]({repo['url']})\n" if cve_link: content += f"> NVD详情: [CVE详情]({cve_link})\n" content += "\n---\n\n" content += f"**监控时间:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n" content += "**来源:** GitHub CVE-POC监控" send_wechat("漏洞情报", content) print("[+] 已推送到企业微信") if __name__ == "__main__": main()5.2 定时任务设置
crontab -e 0 */6 * * * cd /你的路径/vuln-monitor && python3 cve_monitor.py >> /var/log/cve_monitor.log 2>&1