当前位置: 首页 > news >正文

基于Nuclei的自动化漏洞监测告警平台

一、项目背景

在SRC漏洞挖掘和日常渗透测试中,需要持续监控目标资产的安全状态。手动逐个扫描效率低,且容易遗漏。本项目基于Nuclei漏洞扫描引擎,结合企业微信机器人Webhook,实现自动化扫描+实时告警推送,发现高危漏洞立即通知,大幅提升漏洞发现效率。

同时搭建CVE-POC情报监控系统,自动抓取GitHub最新发布的漏洞利用代码,第一时间获取0day情报。

二、技术架构

三、环境准备

3.1 服务器环境
Ubuntu 22.04 / Debian 12
Python3 + requests库
Nuclei扫描引擎

# 安装Nuclei go install -v github.com/projectdiscovery/nuclei/v3/cmd/nuclei@latest nuclei -up # 更新模板 # 安装Python依赖 pip3 install requests

3.2 企业微信机器人配置
进入企业微信群聊 → 右上角「···」→「群机器人」
添加机器人 → 自定义 → 名称填「漏洞监控」

在消息推送中添加
复制Webhook地址,格式如下:

https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxxxxxxx

四、核心功能一:漏洞扫描告警

4.1 完整扫描脚本

#!/usr/bin/env python3 import requests import json import subprocess import sys import re from datetime import datetime WEBHOOK = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=你的Key" def send_wechat(title, content): """推送消息到企业微信""" data = { "msgtype": "markdown", "markdown": {"content": content} } try: resp = requests.post(WEBHOOK, json=data, timeout=10) return resp.json().get("errcode") == 0 except Exception as e: print(f"[-] 推送失败: {e}") return False def get_cve_link(name): """提取CVE编号并生成NVD链接""" cve_match = re.search(r'CVE-(\d{4})-(\d+)', name) if cve_match: return f"https://nvd.nist.gov/vuln/detail/CVE-{cve_match.group(1)}-{cve_match.group(2)}" return None def scan_single(target): """扫描单个目标""" print(f"\n[*] 扫描: {target}") cmd = [ "nuclei", "-u", target, "-s", "critical,high,medium", "-o", "/tmp/nuclei_result.json", "-jsonl", "-c", "50", "-timeout", "10" ] try: subprocess.run(cmd, capture_output=True, text=True, timeout=1800) except subprocess.TimeoutExpired: print("[-] 扫描超时") return [] alerts = [] try: with open("/tmp/nuclei_result.json", "r") as f: for line in f: if not line.strip(): continue data = json.loads(line) info = data.get("info", {}) severity = info.get("severity", "unknown") if severity not in ["critical", "high", "medium"]: continue name = info.get("name", "未知漏洞") host = data.get("host", target) matched = data.get("matched-at", "") template_id = data.get("template-id", "") description = info.get("description", "") reference = info.get("reference", []) emoji = "🔴" if severity == "critical" else "🟠" if severity == "high" else "🟡" cve_link = get_cve_link(name) alert = f"{emoji} **[{severity.upper()}]** {name}\n" alert += f"> 目标: `{host}`\n" if matched: alert += f"> 命中路径: `{matched}`\n" if template_id: alert += f"> 模板ID: `{template_id}`\n" if description: alert += f"> 描述: {description}\n" if cve_link: alert += f"> CVE详情: [点击查看]({cve_link})\n" if reference and len(reference) > 0: ref = reference[0] if isinstance(reference, list) else reference alert += f"> 参考: [漏洞详情]({ref})\n" alerts.append(alert) except Exception as e: print(f"[-] 解析出错: {e}") return alerts def push_results(targets, all_alerts): """汇总并推送扫描结果""" if all_alerts: content = f"## 🚨 漏洞告警 - 共 {len(all_alerts)} 个\n\n" content += "\n---\n\n".join(all_alerts) content += f"\n\n---\n\n**目标数:** {len(targets)}\n" content += f"**时间:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n" content += f"**引擎:** Nuclei" send_wechat("漏洞告警", content) print(f"\n[+] 发现 {len(all_alerts)} 个漏洞,已推送") else: content = f"## ✅ 扫描完成\n\n**目标数:** {len(targets)}\n" content += f"**结果:** 未发现漏洞\n" content += f"**时间:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" send_wechat("扫描完成", content) print("\n[+] 未发现漏洞") if __name__ == "__main__": if len(sys.argv) < 2: print("用法:") print(" 单目标: python3 scan_alert.py http://example.com") print(" 批量: python3 scan_alert.py -f targets.txt") sys.exit(1) arg = sys.argv[1] if arg == "-f": if len(sys.argv) < 3: print("[-] 请指定目标列表文件") sys.exit(1) with open(sys.argv[2], "r") as f: targets = [l.strip() for l in f if l.strip() and not l.startswith("#")] else: targets = [arg] print(f"[*] 共 {len(targets)} 个目标") all_alerts = [] for target in targets: all_alerts.extend(scan_single(target)) push_results(targets, all_alerts)

4.2 目标列表格式(targets.txt)

# 每行一个目标,#开头为注释 http://127.0.0.1:8080 http://testphp.vulnweb.com https://target.example.com

五、使用方法

5.1 单目标扫描

python3 scan_alert.py http://127.0.0.1:8080

5.2 批量扫描

python3 scan_alert.py -f targets.txt

5.3 定时自动扫描(crontab)

# 每天凌晨2点自动扫描 0 2 * * * cd /home/HH/vuln-monitor && python3 scan_alert.py -f targets.txt >> /var/log/vuln_scan.log 2>&1

六、核心功能二:CVE-POC情报监控

5.1 监控脚本

#!/usr/bin/env python3 import requests import json import re from datetime import datetime, timedelta # 企业微信Webhook地址(请替换为自己的) WEBHOOK = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=你的Key" def send_wechat(title, content): data = { "msgtype": "markdown", "markdown": {"content": content} } try: resp = requests.post(WEBHOOK, json=data, timeout=10) return resp.json().get("errcode") == 0 except Exception as e: print(f"[-] 推送失败: {e}") return False def get_recent_cve_poc(): """获取GitHub上最近24小时发布的CVE-POC""" yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d") urls = [ f"https://api.github.com/search/repositories?q=CVE+POC+created:>{yesterday}&sort=updated&order=desc", f"https://api.github.com/search/repositories?q=exploit+CVE+created:>{yesterday}&sort=updated&order=desc" ] results = [] for url in urls: try: resp = requests.get(url, timeout=15) if resp.status_code == 200: data = resp.json() for item in data.get("items", [])[:5]: repo = { "name": item.get("name", ""), "url": item.get("html_url", ""), "desc": item.get("description", "")[:100], "stars": item.get("stargazers_count", 0), "lang": item.get("language", "Unknown"), "updated": item.get("updated_at", "")[:10] } if not any(r["url"] == repo["url"] for r in results): results.append(repo) except Exception as e: print(f"[-] 请求失败: {e}") return results def check_cve_details(repo_name): """尝试提取CVE编号""" cve_match = re.search(r'CVE-(\d{4})-(\d+)', repo_name, re.I) if cve_match: return f"https://nvd.nist.gov/vuln/detail/CVE-{cve_match.group(1)}-{cve_match.group(2)}" return None def main(): print(f"[*] 开始监控CVE-POC: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") repos = get_recent_cve_poc() if not repos: print("[+] 未发现新的CVE-POC") return print(f"[+] 发现 {len(repos)} 个新POC") content = f"## 🔥 漏洞情报监控 - {len(repos)} 个新POC\n\n" for repo in repos: cve_link = check_cve_details(repo["name"]) content += f"### 📌 {repo['name']}\n" content += f"> ⭐ Stars: {repo['stars']} | 语言: {repo['lang']} | 更新: {repo['updated']}\n" if repo["desc"]: content += f"> 描述: {repo['desc']}\n" content += f"> GitHub: [点击查看]({repo['url']})\n" if cve_link: content += f"> NVD详情: [CVE详情]({cve_link})\n" content += "\n---\n\n" content += f"**监控时间:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n" content += "**来源:** GitHub CVE-POC监控" send_wechat("漏洞情报", content) print("[+] 已推送到企业微信") if __name__ == "__main__": main()

5.2 定时任务设置

crontab -e 0 */6 * * * cd /你的路径/vuln-monitor && python3 cve_monitor.py >> /var/log/cve_monitor.log 2>&1

七、效果展示

http://www.jsqmd.com/news/866959/

相关文章:

  • PyTorch DataLoader 内存不足怎么办?教你一招避坑
  • Pikachu靶场搭建与Web渗透实战指南
  • 2026年5月最新太原黄金回收白银回收铂金回收权威排行榜TOP5:纯金+金条+银条+钯金 门店地址联系方式推荐 - 检测回收中心
  • Windows下curl报SEC_E_UNTRUSTED_ROOT的5种正确解决方法
  • DeepSeek API接入全链路实战:从注册到高并发部署的7个关键步骤
  • 魔兽争霸III终极优化指南:5步解决宽屏黑边、FPS限制与地图加载问题
  • 微信小程序wxapkg文件结构解析与源码还原实战
  • 2026年5月最新鹤壁黄金回收白银回收铂金回收权威排行榜TOP5:纯金+金条+银条+钯金 门店地址联系方式推荐 - 检测回收中心
  • 【LangGraph】House_Agent 实战(一):架构与环境配置
  • 从0到1的开源入门实战指南
  • 2026 北京本土口碑好 GEO 优化公司权威 TOP10 排名,含北京服务商选型指南 +FAQ - 资讯纵览
  • 服务器禁Ping实战指南:5种生产环境验证的ICMP过滤方法
  • Next.js授权绕过漏洞CVE-2025-29927深度解析
  • 2026年5月最新泰安黄金回收白银回收铂金回收权威排行榜TOP5:纯金+金条+银条+钯金 门店地址联系方式推荐 - 检测回收中心
  • Unity TextMeshPro中文与特殊字符显示为方块的终极解决方案
  • 2026年5月最新鹤岗黄金回收白银回收铂金回收权威排行榜TOP5:纯金+金条+银条+钯金 门店地址联系方式推荐 - 检测回收中心
  • Unity卡牌翻转与翻书效果实现原理与性能优化
  • 2026沧州灶台贴膜,专业团队这样选才靠谱 - 品牌企业推荐师(官方)
  • Next.js App Router权限绕过漏洞CVE-2025-29927深度解析
  • 宿迁黄金回收正规门店盘点|恒顺、金佑福领衔,全城 20 分钟可达 - 资讯纵览
  • 让老Mac焕发新生:OpenCore Legacy Patcher完整升级指南
  • 2026年5月最新泰州黄金回收白银回收铂金回收权威排行榜TOP5:纯金+金条+银条+钯金 门店地址联系方式推荐 - 检测回收中心
  • Windows热键冲突终极指南:如何用Hotkey Detective一键定位占用程序
  • 普宁月子中心收费标准|套餐里到底包含哪些项目 - 品牌观察
  • 对比直接使用与通过Taotoken调用大模型API的账单清晰度体验
  • doctype、charset、meta如何控制整个渲染流水线
  • Unity Addressables资源管理核心原理与热更实战
  • 2026年5月最新玉林黄金回收白银回收铂金回收权威排行榜TOP5:纯金+金条+银条+钯金 门店地址联系方式推荐 - 检测回收中心
  • 学生用户画像 - 考勤画像可视化分析
  • 2026年5月最新北海黄金回收白银回收铂金回收权威排行榜TOP5:纯金+金条+银条+钯金 门店地址联系方式推荐 - 检测回收中心