开源智能安全运营平台ASP:AI驱动的自动化告警分析与响应实战
1. 项目概述:一个为蓝队而生的智能安全运营平台
如果你是一名安全运营中心的分析师,每天面对海量、重复且低价值的告警,手动筛选、关联、研判,最后写报告,这个过程想必已经让你疲惫不堪。或者,你是一名安全工程师,正在为团队寻找一个既能自动化处理日常告警,又能灵活集成现有工具链,还能利用AI提升研判效率的平台,但市面上的商业SOAR要么太贵,要么太“重”,要么不够开放。今天要聊的这个开源项目Agentic SOC Platform,可能就是你在找的那个答案。
简单来说,ASP是一个以智能体为核心、高度可扩展的开源自动化安全运营平台。它不是一个简单的脚本集合,而是一个完整的工程化框架,旨在将安全运营中那些繁琐、重复的流程——从告警接入、AI辅助分析、事件分诊到自动化响应——串联起来,形成一个闭环。它的核心卖点在于“Agentic”,即智能体驱动。这意味着它内置了对LangChain、LangGraph、Dify等主流AI Agent框架的支持,允许你将大语言模型的能力无缝嵌入到安全分析工作流中,让AI成为你团队里不知疲倦的初级分析师。
我第一次接触这个项目时,最吸引我的是它的务实架构:用Webhook+Redis Stream处理实时告警流,用基于Nocoly的SIRP作为可视化事件管理前台,用Python模块化引擎驱动一切。这听起来技术栈很清晰,没有为了“炫技”而引入不必要的复杂性。接下来,我会结合自己的部署和测试经验,带你深入拆解ASP的架构设计、核心模块的运作原理、一步步的部署实操,以及那些官方文档里可能没写的“坑”和技巧。
2. 核心架构与设计哲学拆解
在深入代码之前,理解ASP的架构设计思路至关重要。这能帮助你在后续的定制开发或故障排查时,清楚地知道数据流向了哪里,以及为什么要这么设计。
2.1 分层架构与数据流
ASP的架构可以清晰地分为三层:接入层、处理层和呈现层。整个数据流是单向且解耦的,这保证了系统的稳定性和可扩展性。
接入层:这一层负责与外部世界对接。核心组件是Webhook Receiver。你的SIEM(如Splunk、Elastic SIEM)、EDR或任何能发送HTTP请求的安全工具,都可以通过配置Webhook,将告警事件以JSON格式推送到ASP的指定端点。这里的设计巧妙之处在于,它没有试图去适配所有SIEM的私有API,而是采用了Webhook这个几乎通用的标准,大大降低了集成门槛。你只需要在SIEM里配一个HTTP告警动作即可。
处理层:这是ASP的大脑,也是最具特色的部分。它进一步分为两个并行的处理引擎:
- 模块引擎:用于流式处理。Webhook接收到的告警会被立即推送到一个Redis Stream中。每个告警类型(你可以自定义)可以对应一个独立的Stream。然后,后台运行的“模块”会作为消费者,从指定的Stream中拉取告警消息进行处理。一个模块就是一个独立的Python处理单元,它可以调用AI模型进行告警摘要、风险评分,可以查询威胁情报进行IoC富化,也可以执行简单的逻辑判断。处理完成后,模块会将结果(一个标准化的安全记录)发送到下一站。
- 剧本引擎:用于交互式批处理。当安全分析师在SIRP界面上看到一个具体的事件、告警或线索时,他可以手动触发一个“剧本”。剧本是一系列更复杂、可能涉及多步骤交互的自动化动作,比如对一个可疑IP进行全链条调查(查Whois、查威胁情报、查历史活动),或者对一台失陷主机执行隔离操作。剧本的执行是由
PlaybookLoader引擎驱动的。
呈现层:即内置的SIRP平台。所有经过模块处理后的标准化记录,都会在这里被创建为安全事件、告警或证据链。SIRP基于Nocoly开发,提供了一个开箱即用的Web界面,用于事件管理、团队协作、报告生成。更重要的是,它作为人机交互界面,是分析师触发剧本操作的入口。
为什么选择Redis Stream?这是一个关键设计点。相比更常见的消息队列如RabbitMQ或Kafka,Redis Stream在ASP这个场景下有几个优势:一是轻量,部署简单,与ASP的Python技术栈集成顺畅;二是它支持消息的持久化和消费者组,能确保告警不丢失,并且允许多个模块实例并行处理同一个Stream以实现负载均衡;三是它的“Stream”数据结构非常贴合“事件流”这个概念,每个告警就是一个带时间戳的条目,便于追溯。
2.2 智能体集成模式解析
“Agentic”是项目的灵魂。ASP没有自己再造一个AI轮子,而是选择了拥抱生态,主要集成了两类框架:
LangChain/LangGraph模式:这是给开发者准备的。如果你熟悉LangChain,你可以编写自己的Chain或Agent,然后在ASP的模块中直接调用。LangGraph的引入尤其适合构建有状态、多步骤的复杂分析流程。例如,你可以设计一个Agent,它先分析告警描述,然后决定去查询哪几个威胁情报源,最后综合所有信息给出处置建议。ASP提供了一个运行环境,让你这些复杂的AI逻辑能作为一个服务持续运行。
Dify模式:这是给运营人员和安全分析师准备的。Dify是一个可视化AI工作流构建工具。ASP可以与Dify集成,意味着你可以通过拖拽的方式,在Dify中设计一个告警分析工作流(比如:输入告警文本 -> 调用GPT提取关键实体 -> 查询VirusTotal -> 生成研判报告),然后将这个工作流发布为一个API。ASP的模块只需要调用这个API即可。这大大降低了将AI能力融入安全流程的技术门槛。
在实际项目中如何选择?我的经验是:对于固定的、逻辑相对简单的AI任务(如文本分类、摘要),可以直接在ASP模块里用OpenAI API或本地LLM的SDK实现。对于需要频繁调整、探索性强的复杂分析流程,先用Dify快速原型化,验证效果后再考虑是否用LangGraph固化为代码。对于追求极致性能和控制的场景,则直接用LangChain/LangGraph开发原生模块。
2.3 扩展性设计:模块与插件
ASP的整个框架由Python写成,其扩展性体现在两个方面:
- 模块:这是核心处理单元。每个模块都是一个独立的Python类,继承自基类,需要实现
process等方法。官方提供了很多示例模块,比如调用OpenAI的、查询 AbuseIPDB 的。你可以像搭积木一样,编写自己的模块,放入modules目录,系统会自动加载。模块之间通过Redis Stream和内部事件总线通信,耦合度很低。 - 插件:更多用于扩展SIRP前端的功能,比如添加新的数据可视化组件、自定义报表格式等。
这种设计使得ASP不仅能接入各种安全设备,也能将内部的各种API(CMDB、工单系统、内部威胁情报)轻松集成进来,真正成为企业安全运营的“中枢神经”。
3. 从零开始:环境部署与核心配置实战
理论讲完了,我们动手把它跑起来。我会基于最常见的Linux服务器环境,带你走一遍从Docker部署到接入第一个告警的全过程。
3.1 基础环境准备
假设我们有一台干净的Ubuntu 22.04 LTS服务器。ASP强烈推荐使用Docker Compose部署,这能省去大量依赖管理的麻烦。
# 1. 更新系统并安装必要工具 sudo apt update && sudo apt upgrade -y sudo apt install -y docker.io docker-compose-v2 git curl # 2. 将当前用户加入docker组,避免每次都要sudo sudo usermod -aG docker $USER newgrp docker # 刷新组权限,或退出重新登录 # 3. 克隆项目代码 git clone https://github.com/funnywolf/agentic-soc-platform.git cd agentic-soc-platform3.2 Docker Compose部署与关键配置调整
项目根目录下已经提供了docker-compose.yml。但在启动前,有几个关键配置必须修改,否则可能会无法运行或存在安全风险。
首先,配置环境变量文件。复制示例文件并编辑:
cp .env.example .env vim .env # 或用你喜欢的编辑器以下是最需要关注的几个变量:
# AI相关配置 - 如果你暂时不用AI功能,可以先不配,但部分模块可能报错 OPENAI_API_KEY=sk-xxx # 如果用OpenAI OPENAI_BASE_URL=https://api.openai.com/v1 # 如果用的是Azure OpenAI或第三方代理,需修改 LOCAL_LLM_MODEL=llama3:latest # 如果使用本地Ollama等模型 LOCAL_LLM_BASE_URL=http://host.docker.internal:11434 # Ollama通常在本地11434端口 # Redis配置 - 生产环境务必修改密码! REDIS_PASSWORD=YourSuperStrongRedisPassword123! # 改成强密码 REDIS_STREAM_ALERT=alerts # 告警Stream的名称,可按需修改 # 数据库配置 POSTGRES_PASSWORD=YourStrongPostgresPass456! # 改成强密码 POSTGRES_USER=asp_user POSTGRES_DB=asp_db # Webhook接收器密钥 - 用于验证传入的Webhook请求,非常重要! WEBHOOK_SECRET_KEY=YourWebhookSecretKey789! # 生成一个随机字符串 # SIRP前端访问密钥 SIRP_ADMIN_API_KEY=AnotherSecretKeyForSIRP101112安全警告:
.env文件包含所有敏感信息。绝对不要将其提交到版本控制系统。在生产环境中,应考虑使用Docker Secrets或专门的密钥管理服务。
其次,检查Docker Compose文件。打开docker-compose.yml,确保所有服务的端口映射符合你的预期,并且没有不必要的端口暴露到公网。通常,我们只需要将SIRP的Web界面端口(如3000)和Webhook接收端口(如5000)映射出来。
3.3 启动服务与初始化
配置完成后,一键启动所有服务:
docker-compose up -d使用docker-compose logs -f可以查看实时日志,观察各容器启动是否正常。重点关注webhook-receiver、module-engine、sirp-web这几个服务。
首次启动后,需要初始化SIRP的数据库结构。通常SIRP容器在启动时会自动执行迁移。但为了保险,可以手动执行:
# 进入SIRP的容器执行数据库迁移 docker-compose exec sirp-web python manage.py migrate # 创建超级用户(用于登录SIRP后台) docker-compose exec sirp-web python manage.py createsuperuser按照提示输入用户名、邮箱和密码。这个账号将用于登录SIRP管理界面。
3.4 验证部署与初步访问
- 检查服务状态:访问
http://你的服务器IP:3000,应该能看到SIRP的登录界面。 - 登录SIRP:使用上一步创建的超级用户账号登录。
- 检查Webhook服务:访问
http://你的服务器IP:5000/health,应该返回一个简单的健康状态JSON。这证明Webhook接收器已就绪。 - 查看Redis数据:可以进入Redis容器,查看Stream是否创建。
docker-compose exec redis redis-cli -a YourSuperStrongRedisPassword123! > XINFO STREAMS alerts # 查看alerts stream的信息
如果以上步骤都成功,恭喜你,ASP平台的基础骨架已经搭建起来了。但它现在还是个空壳,没有处理逻辑。接下来我们要让它“活”起来。
4. 核心模块开发与AI能力集成实战
平台跑起来了,现在我们来打造它的“肌肉”——编写和配置处理模块。我们以一个最常见的场景为例:接收一个来自SIEM的暴力破解告警,调用AI模型分析其严重性,并自动创建SIRP事件。
4.1 理解模块的基本结构
在modules/目录下,官方提供了很多示例。我们创建一个新的模块文件modules/bruteforce_ai_analyzer.py。
一个最基础的模块需要包含以下部分:
import json import logging from typing import Dict, Any from asp_module_engine.base_module import BaseModule # 设置日志 logger = logging.getLogger(__name__) class BruteforceAIAnalyzer(BaseModule): """针对暴力破解告警的AI分析模块""" # 模块的元数据 name = "bruteforce_ai_analyzer" description = "使用LLM分析暴力破解告警,评估风险并富化信息。" version = "1.0.0" # 该模块监听的Redis Stream名称,必须与Webhook配置的target_stream一致 input_stream = "alerts" def __init__(self, config: Dict[str, Any]): super().__init__(config) # 初始化你的资源,比如AI客户端、情报API客户端等 self.llm_client = None # 这里后续会初始化 self.initialize_llm() def initialize_llm(self): """初始化LLM客户端。这里演示使用OpenAI API。""" try: # 从环境变量或配置中读取API Key api_key = self.config.get("openai_api_key") or os.getenv("OPENAI_API_KEY") base_url = self.config.get("openai_base_url") or os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1") # 这里以openai库为例,实际可用langchain等 from openai import OpenAI self.llm_client = OpenAI(api_key=api_key, base_url=base_url) logger.info("LLM client initialized successfully.") except Exception as e: logger.error(f"Failed to initialize LLM client: {e}") self.llm_client = None async def process(self, message: Dict[str, Any]) -> Dict[str, Any]: """ 核心处理方法。从Redis Stream中消费一条消息,进行处理。 :param message: 从Stream中读取的原始消息数据 :return: 处理后的结果字典,通常会被发送到SIRP """ alert_data = message.get("data", {}) alert_id = alert_data.get("id", "unknown") logger.info(f"Processing bruteforce alert: {alert_id}") # 1. 提取关键信息 source_ip = alert_data.get("source_ip") target_user = alert_data.get("target_user", "unknown") attempt_count = alert_data.get("attempt_count", 1) raw_message = alert_data.get("message", "") # 2. 调用AI进行分析 analysis_result = await self.analyze_with_ai(raw_message, source_ip, attempt_count) # 3. 构建标准化的输出格式(SIRP期望的格式) output_record = { "type": "alert", "source": "bruteforce_ai_analyzer", "severity": analysis_result.get("severity", "medium"), "confidence": analysis_result.get("confidence", 0.7), "title": f"AI Analyzed Bruteforce: {source_ip} -> {target_user}", "description": analysis_result.get("summary"), "raw_data": alert_data, # 保留原始数据 "enriched_data": { "ai_analysis": analysis_result, "source_ip": source_ip, "target_user": target_user, "tags": ["bruteforce", "ai_analyzed"] + analysis_result.get("tags", []) }, "timestamp": alert_data.get("timestamp") # 使用原始告警时间 } logger.info(f"Alert {alert_id} processed. Severity: {output_record['severity']}") return output_record async def analyze_with_ai(self, raw_message: str, source_ip: str, attempt_count: int) -> Dict[str, Any]: """调用LLM分析告警""" if not self.llm_client: return {"summary": "LLM not available.", "severity": "medium", "confidence": 0.5} prompt = f""" 你是一名网络安全分析师。请分析以下暴力破解告警: 原始告警信息:{raw_message} 源IP地址:{source_ip} 尝试次数:{attempt_count} 请完成以下任务: 1. 用一句话总结该告警的核心风险。 2. 评估严重等级(critical, high, medium, low, info)。 3. 给出置信度(0到1之间的小数)。 4. 提供两到三个下一步调查或处置建议。 5. 生成几个相关的标签(tags)。 请以JSON格式回复,包含以下键:summary, severity, confidence, suggestions (列表), tags (列表)。 """ try: response = self.llm_client.chat.completions.create( model="gpt-3.5-turbo", # 或你配置的模型 messages=[{"role": "user", "content": prompt}], temperature=0.2, # 低温度,输出更稳定 response_format={ "type": "json_object" } # 要求返回JSON ) ai_output = json.loads(response.choices[0].message.content) return ai_output except Exception as e: logger.error(f"AI analysis failed: {e}") return { "summary": f"AI分析失败: {str(e)}", "severity": "medium", "confidence": 0.3, "suggestions": ["检查AI服务连接"], "tags": ["ai_failed"] }4.2 配置模块并使其生效
编写完模块后,需要告诉ASP引擎加载它。编辑config/modules.yaml(如果不存在则创建):
modules: - name: bruteforce_ai_analyzer class: modules.bruteforce_ai_analyzer.BruteforceAIAnalyzer enabled: true config: # 模块自身的配置,会传递给__init__中的config参数 openai_api_key: "{{ env.OPENAI_API_KEY }}" # 可以使用Jinja2模板引用环境变量 openai_base_url: "{{ env.OPENAI_BASE_URL }}" input_stream: alerts # 监听的Stream,与代码中一致关键一步:需要重启module-engine服务以加载新配置。
docker-compose restart module-engine通过docker-compose logs -f module-engine查看日志,确认新模块被成功加载,没有报错。
4.3 配置Webhook接收与路由
现在模块准备好了,我们需要让告警能流到这个模块。这需要配置Webhook接收器,告诉它收到某种类型的告警后,应该丢到哪个Redis Stream。
编辑config/webhook_routes.yaml:
routes: - name: siem_bruteforce_alerts path: "/webhook/siem/bruteforce" # Webhook的URL路径 method: "POST" target_stream: "alerts" # 将数据发送到'alerts' stream # 验证规则(可选但推荐) validation: secret_header: "X-Webhook-Secret" secret_value: "YourWebhookSecretKey789!" # 与.env中的WEBHOOK_SECRET_KEY一致 # 数据转换(可选):将不同SIEM的格式统一为ASP内部格式 transform: | function transform(payload) { // 假设SIEM发送的格式是 {“event”: {“src_ip”: “1.2.3.4”, “user”: “admin”, “count”: 10}} return { “id”: payload.event.id || Date.now().toString(), “source_ip”: payload.event.src_ip, “target_user”: payload.event.user, “attempt_count”: payload.event.count, “message”: `Bruteforce attempt from ${payload.event.src_ip} on user ${payload.event.user}`, “timestamp”: new Date().toISOString(), “raw_payload”: payload // 保留原始数据 }; }重启webhook-receiver服务:
docker-compose restart webhook-receiver至此,一个完整的处理流水线就配置好了:SIEM --Webhook--> ASP Webhook Receiver --(经转换)--> Redis Stream ‘alerts’ --被消费--> BruteforceAIAnalyzer模块 --输出--> SIRP
4.4 测试端到端流程
我们可以用curl模拟SIEM发送一个告警来测试整个链路:
curl -X POST http://localhost:5000/webhook/siem/bruteforce \ -H "Content-Type: application/json" \ -H "X-Webhook-Secret: YourWebhookSecretKey789!" \ -d '{ “event”: { “id”: “test_alert_001”, “src_ip”: “192.168.1.100”, “user”: “Administrator”, “count”: 15, “timestamp”: “2023-10-27T10:30:00Z” } }'然后,依次检查:
- Webhook日志:
docker-compose logs webhook-receiver应该显示收到请求并转发。 - 模块引擎日志:
docker-compose logs module-engine应该显示bruteforce_ai_analyzer模块处理了一条告警,并打印出AI分析的结果和严重性。 - SIRP界面:登录SIRP,在“告警”或“事件”页面,应该能看到一条新的记录,标题类似“AI Analyzed Bruteforce: 192.168.1.100 -> Administrator”,并且包含了AI生成的摘要和建议。
如果一切顺利,你就成功搭建了一个AI驱动的自动化告警分析流水线!
5. 生产环境部署的进阶考量与避坑指南
在测试环境跑通只是第一步。要将ASP用于生产,还需要考虑更多因素。以下是我在实际部署中积累的一些关键经验和常见问题的解决方案。
5.1 性能、高可用与监控
性能调优:
- Redis:生产环境务必为Redis配置持久化(AOF或RDB),并分配足够内存。如果告警量巨大,考虑使用Redis Cluster。
docker-compose.yml中的默认配置可能不够。 - 模块引擎:
module-engine默认是单进程。对于CPU密集型的模块(如复杂的AI推理),可以考虑启动多个实例,并利用Redis Stream的消费者组特性实现并行处理。这需要修改部署配置,可能要用到Kubernetes或更复杂的Docker Compose编排。 - 数据库:PostgreSQL是SIRP的后端。需要根据事件数量定期进行数据库维护(如清理旧数据、建立索引)。SIRP的管理界面可能自带一些维护任务。
- Redis:生产环境务必为Redis配置持久化(AOF或RDB),并分配足够内存。如果告警量巨大,考虑使用Redis Cluster。
高可用:核心在于无状态服务的多实例部署和有状态服务的备份。
- Webhook Receiver和Module Engine是无状态的,可以水平扩展,前面用负载均衡器(如Nginx)分发流量。
- Redis和PostgreSQL是有状态的,需要部署主从复制或集群方案。对于中小规模,可以使用云服务的托管数据库,它们通常内置了高可用。
- SIRP Web:也可以多实例部署,共享同一个数据库和Redis。
监控与日志:
- 应用日志:ASP各组件都输出标准日志到
stdout,被Docker捕获。生产环境应该使用docker-compose的日志驱动,将日志集中发送到ELK、Loki等日志平台。 - 业务指标:需要自己埋点或通过外部监控。关键指标包括:Webhook接收速率、各Stream的消息堆积数、各模块的处理延迟和错误率、SIRP的API响应时间。
- 健康检查:Docker Compose文件应该为每个服务配置
healthcheck,以便在容器不健康时自动重启或告警。
- 应用日志:ASP各组件都输出标准日志到
5.2 安全加固配置清单
开源项目默认配置往往不适用于生产。以下安全措施必须考虑:
网络隔离:
- 将ASP的所有服务部署在一个独立的Docker网络或Kubernetes命名空间中。
- 严格限制暴露的端口。通常只将SIRP Web的端口(如443,通过反向代理)和Webhook Receiver的端口(如5000)暴露给内部网络或特定的SIEM IP地址段。切勿将管理端口(如PostgreSQL的5432,Redis的6379)暴露给公网。
- 在Webhook Receiver前部署反向代理(如Nginx),并配置TLS终止、速率限制和基于IP的访问控制列表。
认证与授权:
- Webhook:必须使用
WEBHOOK_SECRET_KEY,并在所有Webhook请求头中验证。这是防止恶意数据注入的第一道防线。 - SIRP:充分利用其内置的用户角色权限系统。不要所有人都用超级管理员账号。为分析师、只读用户等创建不同角色的账号。
- API密钥:所有AI服务、威胁情报API的密钥都必须通过环境变量或密钥管理服务注入,绝不能硬编码在配置文件或代码中。
- Webhook:必须使用
数据安全:
- 定期备份PostgreSQL数据库和Redis的持久化文件。
- 考虑对数据库中的敏感字段(如原始告警中的密码哈希、个人信息)进行加密存储。
- 制定数据保留策略,定期归档或清理过期的告警和事件数据。
5.3 常见问题与排查实录
即使按照指南操作,你也可能会遇到一些问题。这里记录几个我踩过的“坑”:
问题一:Module Engine启动失败,报错“ModuleNotFoundError: No module named ‘xxx’”
- 原因:模块文件存在语法错误,或者模块依赖的Python包没有安装。
- 排查:
- 检查模块文件的Python语法:
python -m py_compile your_module.py。 - 查看
module-engine容器的日志,错误信息通常会指出具体是哪一行。 - 确认你的模块是否在
config/modules.yaml中正确注册,class的路径是否正确。 - 如果模块依赖第三方库,需要在
Dockerfile中为module-engine镜像添加安装命令,或者通过pip install在容器启动时安装。
- 检查模块文件的Python语法:
问题二:Webhook发送成功,但SIRP里看不到事件
- 原因:这是数据流中断的典型表现。需要分段排查。
- 排查步骤:
- 查Webhook接收器日志:
docker-compose logs webhook-receiver,确认请求是否被正确接收,状态码是否为200。如果返回4xx,检查路由配置和Secret验证。 - 查Redis:进入Redis容器,查看目标Stream是否有新消息:
XREAD COUNT 1 STREAMS alerts 0。 - 查Module Engine日志:确认对应的模块是否被触发,处理过程中是否有异常。模块处理后的输出会发送到SIRP的API,检查这里是否有网络或认证错误。
- 查SIRP API日志:SIRP的Web服务日志可能会记录创建事件时遇到的错误,比如数据格式不符合预期。
- 查Webhook接收器日志:
问题三:AI模块响应缓慢,导致Stream消息堆积
- 原因:调用外部LLM API(尤其是GPT-4)或本地大模型推理速度慢,成为性能瓶颈。
- 解决方案:
- 异步处理:确保模块的
process方法是async的,并且内部调用AI API时使用了异步客户端(如httpx.AsyncClient或openai.AsyncOpenAI)。这样可以在等待AI响应的同时处理其他消息。 - 批量处理:修改模块逻辑,一次从Stream中读取多条消息(
XREAD的COUNT参数),然后批量发送给AI API(如果API支持),可以显著提高吞吐量。 - 缓存:对于重复出现的源IP、哈希值等,可以将AI分析结果缓存到Redis中一段时间,避免对相同指标重复分析。
- 降级方案:在AI服务不可用或超时时,模块应能自动降级,使用基于规则的简单分析,保证流程不中断。
- 异步处理:确保模块的
问题四:自定义模块的配置不生效
- 原因:
config/modules.yaml中的配置没有正确传递给模块,或者模块代码中读取配置的方式不对。 - 检查:
- 模块的
__init__方法是否正确接收了config参数,并调用super().__init__(config)。 - 在模块内部,通过
self.config.get(‘key’)来读取配置。 - 确保YAML文件语法正确,缩进无误。可以使用在线YAML校验器检查。
- 重启
module-engine服务使配置生效。
- 模块的
- 原因:
6. 从自动化到智能化:构建复杂响应剧本
模块处理了告警,创建了事件。接下来,安全分析师在SIRP中查看事件时,可以手动触发更复杂的自动化操作,这就是“剧本”的用武之地。剧本比模块更侧重于交互式和多步骤的响应动作。
6.1 剧本的概念与编写
一个剧本本质上是一个Python脚本,它可以通过SIRP的UI被触发,并作用于一个具体的对象(事件、告警、资产等)。剧本可以执行诸如“隔离主机”、“查询全量威胁情报”、“生成处置报告并发送邮件”等操作。
剧本存放在playbooks/目录下。我们编写一个简单的剧本,用于对事件中的可疑IP进行全面的威胁情报查询。
创建playbooks/enrich_ip_threat_intel.py:
#!/usr/bin/env python3 import requests import json from typing import Dict, Any, List # 剧本的元信息,用于在SIRP UI中显示 METADATA = { “name”: “IP威胁情报富化”, “description”: “对事件中的IP地址进行多源威胁情报查询。”, “author”: “Your Team”, “version”: “1.0”, “supported_object_types”: [“case”, “alert”] # 可以针对哪些类型的对象触发 } # 需要用户输入的参数(会在SIRP UI中显示为表单) INPUT_SCHEMA = [ { “name”: “ip_address”, “type”: “string”, “description”: “要查询的IP地址”, “required”: True }, { “name”: “intel_sources”, “type”: “multiselect”, “description”: “选择要查询的情报源”, “options”: [“AbuseIPDB”, “VirusTotal”, “Shodan”, “GreyNoise”], “default”: [“AbuseIPDB”, “VirusTotal”], “required”: True } ] def run(playbook_input: Dict[str, Any], sirp_api_client, context: Dict[str, Any]) -> Dict[str, Any]: """ 剧本的主函数。 :param playbook_input: 用户通过UI输入的表单数据 :param sirp_api_client: 一个预配置好的客户端,用于与SIRP API交互(如更新事件) :param context: 上下文信息,如触发剧本的对象ID、类型等 :return: 一个结果字典,会显示在SIRP的剧本执行结果中 """ ip = playbook_input.get(“ip_address”) sources = playbook_input.get(“intel_sources”, []) if not ip: return {“success”: False, “error”: “未提供IP地址”} results = {“ip”: ip, “sources”: {}} # 1. 查询AbuseIPDB (示例) if “AbuseIPDB” in sources: abuseipdb_result = query_abuseipdb(ip) results[“sources”][“AbuseIPDB”] = abuseipdb_result # 2. 查询VirusTotal (示例) if “VirusTotal” in sources: vt_result = query_virustotal(ip) results[“sources”][“VirusTotal”] = vt_result # 3. 将结果作为“证据”添加到SIRP事件中 object_id = context.get(“object_id”) object_type = context.get(“object_type”) if object_id and object_type: artifact_data = { “type”: “threat_intel_report”, “data”: results, “description”: f”IP {ip} 的多源威胁情报查询结果”, “tags”: [“playbook”, “threat_intel”, “ip_enrichment”] } # 使用API客户端添加证据 try: sirp_api_client.create_artifact(object_type, object_id, artifact_data) results[“sirp_artifact_created”] = True except Exception as e: results[“sirp_artifact_error”] = str(e) # 4. 总结判断 malicious_indicators = 0 for source, data in results[“sources”].items(): if data.get(“is_malicious”): malicious_indicators += 1 results[“summary”] = { “is_suspicious”: malicious_indicators > 0, “malicious_source_count”: malicious_indicators, “recommendation”: “建议封锁此IP” if malicious_indicators > 1 else “建议持续监控” } return {“success”: True, “results”: results} def query_abuseipdb(ip: str) -> Dict[str, Any]: """模拟查询AbuseIPDB API""" # 实际使用时,需要配置API Key并从环境变量读取 api_key = os.getenv(“ABUSEIPDB_API_KEY”) if not api_key: return {“error”: “API key not configured”, “is_malicious”: False} # 这里是伪代码,实际需要调用requests库 # response = requests.get(f“https://api.abuseipdb.com/api/v2/check?ipAddress={ip}”, headers={...}) # return parse_response(response) # 模拟返回 return { “abuseConfidenceScore”: 85, “totalReports”: 12, “is_malicious”: True, “lastReportedAt”: “2023-10-26T08:30:00Z” } def query_virustotal(ip: str) -> Dict[str, Any]: """模拟查询VirusTotal API""" # 类似AbuseIPDB,需要API Key和实际调用 return { “harmless_votes”: 5, “malicious_votes”: 8, “suspicious_votes”: 2, “is_malicious”: True, “detected_urls”: [“http://malicious-site.com/”] }6.2 在SIRP中配置与触发剧本
剧本写好后,需要在SIRP的管理界面进行注册和配置。
放置剧本文件:将
enrich_ip_threat_intel.py放到ASP容器内的/app/playbooks目录(通常通过Docker volume映射)。重启PlaybookLoader服务:
docker-compose restart playbook-loader,使其加载新剧本。在SIRP UI中配置:
- 以管理员身份登录SIRP。
- 进入“设置”或“剧本管理”区域。
- 应该能看到系统自动扫描到的“IP威胁情报富化”剧本。
- 点击配置,可能需要输入各个情报源的API密钥(这些密钥可以存储在SIRP的密钥管理器中,剧本通过
sirp_api_client或环境变量获取)。 - 启用该剧本,并分配可以执行它的用户角色。
触发剧本:
- 分析师在SIRP中打开一个具体的事件或告警。
- 在详情页的“操作”或“剧本”选项卡中,会看到可用的剧本列表。
- 选择“IP威胁情报富化”,会弹出表单让你输入要查询的IP(通常可以自动填充当前事件中的IP),选择情报源。
- 点击执行,剧本会在后台运行。执行完成后,结果会显示在页面上,并且生成的新证据(Threat Intel Report)会自动关联到该事件。
通过这种方式,安全团队可以将那些需要多步骤、调用多个外部API的复杂调查动作,沉淀为可重复使用的“剧本”,极大提升事件响应的效率和一致性。
