当前位置: 首页 > news >正文

开源智能安全运营平台ASP:AI驱动的自动化告警分析与响应实战

1. 项目概述:一个为蓝队而生的智能安全运营平台

如果你是一名安全运营中心的分析师,每天面对海量、重复且低价值的告警,手动筛选、关联、研判,最后写报告,这个过程想必已经让你疲惫不堪。或者,你是一名安全工程师,正在为团队寻找一个既能自动化处理日常告警,又能灵活集成现有工具链,还能利用AI提升研判效率的平台,但市面上的商业SOAR要么太贵,要么太“重”,要么不够开放。今天要聊的这个开源项目Agentic SOC Platform,可能就是你在找的那个答案。

简单来说,ASP是一个以智能体为核心、高度可扩展的开源自动化安全运营平台。它不是一个简单的脚本集合,而是一个完整的工程化框架,旨在将安全运营中那些繁琐、重复的流程——从告警接入、AI辅助分析、事件分诊到自动化响应——串联起来,形成一个闭环。它的核心卖点在于“Agentic”,即智能体驱动。这意味着它内置了对LangChain、LangGraph、Dify等主流AI Agent框架的支持,允许你将大语言模型的能力无缝嵌入到安全分析工作流中,让AI成为你团队里不知疲倦的初级分析师。

我第一次接触这个项目时,最吸引我的是它的务实架构:用Webhook+Redis Stream处理实时告警流,用基于Nocoly的SIRP作为可视化事件管理前台,用Python模块化引擎驱动一切。这听起来技术栈很清晰,没有为了“炫技”而引入不必要的复杂性。接下来,我会结合自己的部署和测试经验,带你深入拆解ASP的架构设计、核心模块的运作原理、一步步的部署实操,以及那些官方文档里可能没写的“坑”和技巧。

2. 核心架构与设计哲学拆解

在深入代码之前,理解ASP的架构设计思路至关重要。这能帮助你在后续的定制开发或故障排查时,清楚地知道数据流向了哪里,以及为什么要这么设计。

2.1 分层架构与数据流

ASP的架构可以清晰地分为三层:接入层、处理层和呈现层。整个数据流是单向且解耦的,这保证了系统的稳定性和可扩展性。

  1. 接入层:这一层负责与外部世界对接。核心组件是Webhook Receiver。你的SIEM(如Splunk、Elastic SIEM)、EDR或任何能发送HTTP请求的安全工具,都可以通过配置Webhook,将告警事件以JSON格式推送到ASP的指定端点。这里的设计巧妙之处在于,它没有试图去适配所有SIEM的私有API,而是采用了Webhook这个几乎通用的标准,大大降低了集成门槛。你只需要在SIEM里配一个HTTP告警动作即可。

  2. 处理层:这是ASP的大脑,也是最具特色的部分。它进一步分为两个并行的处理引擎:

    • 模块引擎:用于流式处理。Webhook接收到的告警会被立即推送到一个Redis Stream中。每个告警类型(你可以自定义)可以对应一个独立的Stream。然后,后台运行的“模块”会作为消费者,从指定的Stream中拉取告警消息进行处理。一个模块就是一个独立的Python处理单元,它可以调用AI模型进行告警摘要、风险评分,可以查询威胁情报进行IoC富化,也可以执行简单的逻辑判断。处理完成后,模块会将结果(一个标准化的安全记录)发送到下一站。
    • 剧本引擎:用于交互式批处理。当安全分析师在SIRP界面上看到一个具体的事件、告警或线索时,他可以手动触发一个“剧本”。剧本是一系列更复杂、可能涉及多步骤交互的自动化动作,比如对一个可疑IP进行全链条调查(查Whois、查威胁情报、查历史活动),或者对一台失陷主机执行隔离操作。剧本的执行是由PlaybookLoader引擎驱动的。
  3. 呈现层:即内置的SIRP平台。所有经过模块处理后的标准化记录,都会在这里被创建为安全事件、告警或证据链。SIRP基于Nocoly开发,提供了一个开箱即用的Web界面,用于事件管理、团队协作、报告生成。更重要的是,它作为人机交互界面,是分析师触发剧本操作的入口。

为什么选择Redis Stream?这是一个关键设计点。相比更常见的消息队列如RabbitMQ或Kafka,Redis Stream在ASP这个场景下有几个优势:一是轻量,部署简单,与ASP的Python技术栈集成顺畅;二是它支持消息的持久化和消费者组,能确保告警不丢失,并且允许多个模块实例并行处理同一个Stream以实现负载均衡;三是它的“Stream”数据结构非常贴合“事件流”这个概念,每个告警就是一个带时间戳的条目,便于追溯。

2.2 智能体集成模式解析

“Agentic”是项目的灵魂。ASP没有自己再造一个AI轮子,而是选择了拥抱生态,主要集成了两类框架:

  1. LangChain/LangGraph模式:这是给开发者准备的。如果你熟悉LangChain,你可以编写自己的Chain或Agent,然后在ASP的模块中直接调用。LangGraph的引入尤其适合构建有状态、多步骤的复杂分析流程。例如,你可以设计一个Agent,它先分析告警描述,然后决定去查询哪几个威胁情报源,最后综合所有信息给出处置建议。ASP提供了一个运行环境,让你这些复杂的AI逻辑能作为一个服务持续运行。

  2. Dify模式:这是给运营人员和安全分析师准备的。Dify是一个可视化AI工作流构建工具。ASP可以与Dify集成,意味着你可以通过拖拽的方式,在Dify中设计一个告警分析工作流(比如:输入告警文本 -> 调用GPT提取关键实体 -> 查询VirusTotal -> 生成研判报告),然后将这个工作流发布为一个API。ASP的模块只需要调用这个API即可。这大大降低了将AI能力融入安全流程的技术门槛。

在实际项目中如何选择?我的经验是:对于固定的、逻辑相对简单的AI任务(如文本分类、摘要),可以直接在ASP模块里用OpenAI API或本地LLM的SDK实现。对于需要频繁调整、探索性强的复杂分析流程,先用Dify快速原型化,验证效果后再考虑是否用LangGraph固化为代码。对于追求极致性能和控制的场景,则直接用LangChain/LangGraph开发原生模块。

2.3 扩展性设计:模块与插件

ASP的整个框架由Python写成,其扩展性体现在两个方面:

  • 模块:这是核心处理单元。每个模块都是一个独立的Python类,继承自基类,需要实现process等方法。官方提供了很多示例模块,比如调用OpenAI的、查询 AbuseIPDB 的。你可以像搭积木一样,编写自己的模块,放入modules目录,系统会自动加载。模块之间通过Redis Stream和内部事件总线通信,耦合度很低。
  • 插件:更多用于扩展SIRP前端的功能,比如添加新的数据可视化组件、自定义报表格式等。

这种设计使得ASP不仅能接入各种安全设备,也能将内部的各种API(CMDB、工单系统、内部威胁情报)轻松集成进来,真正成为企业安全运营的“中枢神经”。

3. 从零开始:环境部署与核心配置实战

理论讲完了,我们动手把它跑起来。我会基于最常见的Linux服务器环境,带你走一遍从Docker部署到接入第一个告警的全过程。

3.1 基础环境准备

假设我们有一台干净的Ubuntu 22.04 LTS服务器。ASP强烈推荐使用Docker Compose部署,这能省去大量依赖管理的麻烦。

# 1. 更新系统并安装必要工具 sudo apt update && sudo apt upgrade -y sudo apt install -y docker.io docker-compose-v2 git curl # 2. 将当前用户加入docker组,避免每次都要sudo sudo usermod -aG docker $USER newgrp docker # 刷新组权限,或退出重新登录 # 3. 克隆项目代码 git clone https://github.com/funnywolf/agentic-soc-platform.git cd agentic-soc-platform

3.2 Docker Compose部署与关键配置调整

项目根目录下已经提供了docker-compose.yml。但在启动前,有几个关键配置必须修改,否则可能会无法运行或存在安全风险。

首先,配置环境变量文件。复制示例文件并编辑:

cp .env.example .env vim .env # 或用你喜欢的编辑器

以下是最需要关注的几个变量:

# AI相关配置 - 如果你暂时不用AI功能,可以先不配,但部分模块可能报错 OPENAI_API_KEY=sk-xxx # 如果用OpenAI OPENAI_BASE_URL=https://api.openai.com/v1 # 如果用的是Azure OpenAI或第三方代理,需修改 LOCAL_LLM_MODEL=llama3:latest # 如果使用本地Ollama等模型 LOCAL_LLM_BASE_URL=http://host.docker.internal:11434 # Ollama通常在本地11434端口 # Redis配置 - 生产环境务必修改密码! REDIS_PASSWORD=YourSuperStrongRedisPassword123! # 改成强密码 REDIS_STREAM_ALERT=alerts # 告警Stream的名称,可按需修改 # 数据库配置 POSTGRES_PASSWORD=YourStrongPostgresPass456! # 改成强密码 POSTGRES_USER=asp_user POSTGRES_DB=asp_db # Webhook接收器密钥 - 用于验证传入的Webhook请求,非常重要! WEBHOOK_SECRET_KEY=YourWebhookSecretKey789! # 生成一个随机字符串 # SIRP前端访问密钥 SIRP_ADMIN_API_KEY=AnotherSecretKeyForSIRP101112

安全警告.env文件包含所有敏感信息。绝对不要将其提交到版本控制系统。在生产环境中,应考虑使用Docker Secrets或专门的密钥管理服务。

其次,检查Docker Compose文件。打开docker-compose.yml,确保所有服务的端口映射符合你的预期,并且没有不必要的端口暴露到公网。通常,我们只需要将SIRP的Web界面端口(如3000)和Webhook接收端口(如5000)映射出来。

3.3 启动服务与初始化

配置完成后,一键启动所有服务:

docker-compose up -d

使用docker-compose logs -f可以查看实时日志,观察各容器启动是否正常。重点关注webhook-receivermodule-enginesirp-web这几个服务。

首次启动后,需要初始化SIRP的数据库结构。通常SIRP容器在启动时会自动执行迁移。但为了保险,可以手动执行:

# 进入SIRP的容器执行数据库迁移 docker-compose exec sirp-web python manage.py migrate # 创建超级用户(用于登录SIRP后台) docker-compose exec sirp-web python manage.py createsuperuser

按照提示输入用户名、邮箱和密码。这个账号将用于登录SIRP管理界面。

3.4 验证部署与初步访问

  1. 检查服务状态:访问http://你的服务器IP:3000,应该能看到SIRP的登录界面。
  2. 登录SIRP:使用上一步创建的超级用户账号登录。
  3. 检查Webhook服务:访问http://你的服务器IP:5000/health,应该返回一个简单的健康状态JSON。这证明Webhook接收器已就绪。
  4. 查看Redis数据:可以进入Redis容器,查看Stream是否创建。
    docker-compose exec redis redis-cli -a YourSuperStrongRedisPassword123! > XINFO STREAMS alerts # 查看alerts stream的信息

如果以上步骤都成功,恭喜你,ASP平台的基础骨架已经搭建起来了。但它现在还是个空壳,没有处理逻辑。接下来我们要让它“活”起来。

4. 核心模块开发与AI能力集成实战

平台跑起来了,现在我们来打造它的“肌肉”——编写和配置处理模块。我们以一个最常见的场景为例:接收一个来自SIEM的暴力破解告警,调用AI模型分析其严重性,并自动创建SIRP事件

4.1 理解模块的基本结构

modules/目录下,官方提供了很多示例。我们创建一个新的模块文件modules/bruteforce_ai_analyzer.py

一个最基础的模块需要包含以下部分:

import json import logging from typing import Dict, Any from asp_module_engine.base_module import BaseModule # 设置日志 logger = logging.getLogger(__name__) class BruteforceAIAnalyzer(BaseModule): """针对暴力破解告警的AI分析模块""" # 模块的元数据 name = "bruteforce_ai_analyzer" description = "使用LLM分析暴力破解告警,评估风险并富化信息。" version = "1.0.0" # 该模块监听的Redis Stream名称,必须与Webhook配置的target_stream一致 input_stream = "alerts" def __init__(self, config: Dict[str, Any]): super().__init__(config) # 初始化你的资源,比如AI客户端、情报API客户端等 self.llm_client = None # 这里后续会初始化 self.initialize_llm() def initialize_llm(self): """初始化LLM客户端。这里演示使用OpenAI API。""" try: # 从环境变量或配置中读取API Key api_key = self.config.get("openai_api_key") or os.getenv("OPENAI_API_KEY") base_url = self.config.get("openai_base_url") or os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1") # 这里以openai库为例,实际可用langchain等 from openai import OpenAI self.llm_client = OpenAI(api_key=api_key, base_url=base_url) logger.info("LLM client initialized successfully.") except Exception as e: logger.error(f"Failed to initialize LLM client: {e}") self.llm_client = None async def process(self, message: Dict[str, Any]) -> Dict[str, Any]: """ 核心处理方法。从Redis Stream中消费一条消息,进行处理。 :param message: 从Stream中读取的原始消息数据 :return: 处理后的结果字典,通常会被发送到SIRP """ alert_data = message.get("data", {}) alert_id = alert_data.get("id", "unknown") logger.info(f"Processing bruteforce alert: {alert_id}") # 1. 提取关键信息 source_ip = alert_data.get("source_ip") target_user = alert_data.get("target_user", "unknown") attempt_count = alert_data.get("attempt_count", 1) raw_message = alert_data.get("message", "") # 2. 调用AI进行分析 analysis_result = await self.analyze_with_ai(raw_message, source_ip, attempt_count) # 3. 构建标准化的输出格式(SIRP期望的格式) output_record = { "type": "alert", "source": "bruteforce_ai_analyzer", "severity": analysis_result.get("severity", "medium"), "confidence": analysis_result.get("confidence", 0.7), "title": f"AI Analyzed Bruteforce: {source_ip} -> {target_user}", "description": analysis_result.get("summary"), "raw_data": alert_data, # 保留原始数据 "enriched_data": { "ai_analysis": analysis_result, "source_ip": source_ip, "target_user": target_user, "tags": ["bruteforce", "ai_analyzed"] + analysis_result.get("tags", []) }, "timestamp": alert_data.get("timestamp") # 使用原始告警时间 } logger.info(f"Alert {alert_id} processed. Severity: {output_record['severity']}") return output_record async def analyze_with_ai(self, raw_message: str, source_ip: str, attempt_count: int) -> Dict[str, Any]: """调用LLM分析告警""" if not self.llm_client: return {"summary": "LLM not available.", "severity": "medium", "confidence": 0.5} prompt = f""" 你是一名网络安全分析师。请分析以下暴力破解告警: 原始告警信息:{raw_message} 源IP地址:{source_ip} 尝试次数:{attempt_count} 请完成以下任务: 1. 用一句话总结该告警的核心风险。 2. 评估严重等级(critical, high, medium, low, info)。 3. 给出置信度(0到1之间的小数)。 4. 提供两到三个下一步调查或处置建议。 5. 生成几个相关的标签(tags)。 请以JSON格式回复,包含以下键:summary, severity, confidence, suggestions (列表), tags (列表)。 """ try: response = self.llm_client.chat.completions.create( model="gpt-3.5-turbo", # 或你配置的模型 messages=[{"role": "user", "content": prompt}], temperature=0.2, # 低温度,输出更稳定 response_format={ "type": "json_object" } # 要求返回JSON ) ai_output = json.loads(response.choices[0].message.content) return ai_output except Exception as e: logger.error(f"AI analysis failed: {e}") return { "summary": f"AI分析失败: {str(e)}", "severity": "medium", "confidence": 0.3, "suggestions": ["检查AI服务连接"], "tags": ["ai_failed"] }

4.2 配置模块并使其生效

编写完模块后,需要告诉ASP引擎加载它。编辑config/modules.yaml(如果不存在则创建):

modules: - name: bruteforce_ai_analyzer class: modules.bruteforce_ai_analyzer.BruteforceAIAnalyzer enabled: true config: # 模块自身的配置,会传递给__init__中的config参数 openai_api_key: "{{ env.OPENAI_API_KEY }}" # 可以使用Jinja2模板引用环境变量 openai_base_url: "{{ env.OPENAI_BASE_URL }}" input_stream: alerts # 监听的Stream,与代码中一致

关键一步:需要重启module-engine服务以加载新配置。

docker-compose restart module-engine

通过docker-compose logs -f module-engine查看日志,确认新模块被成功加载,没有报错。

4.3 配置Webhook接收与路由

现在模块准备好了,我们需要让告警能流到这个模块。这需要配置Webhook接收器,告诉它收到某种类型的告警后,应该丢到哪个Redis Stream。

编辑config/webhook_routes.yaml

routes: - name: siem_bruteforce_alerts path: "/webhook/siem/bruteforce" # Webhook的URL路径 method: "POST" target_stream: "alerts" # 将数据发送到'alerts' stream # 验证规则(可选但推荐) validation: secret_header: "X-Webhook-Secret" secret_value: "YourWebhookSecretKey789!" # 与.env中的WEBHOOK_SECRET_KEY一致 # 数据转换(可选):将不同SIEM的格式统一为ASP内部格式 transform: | function transform(payload) { // 假设SIEM发送的格式是 {“event”: {“src_ip”: “1.2.3.4”, “user”: “admin”, “count”: 10}} return { “id”: payload.event.id || Date.now().toString(), “source_ip”: payload.event.src_ip, “target_user”: payload.event.user, “attempt_count”: payload.event.count, “message”: `Bruteforce attempt from ${payload.event.src_ip} on user ${payload.event.user}`, “timestamp”: new Date().toISOString(), “raw_payload”: payload // 保留原始数据 }; }

重启webhook-receiver服务:

docker-compose restart webhook-receiver

至此,一个完整的处理流水线就配置好了:SIEM --Webhook--> ASP Webhook Receiver --(经转换)--> Redis Stream ‘alerts’ --被消费--> BruteforceAIAnalyzer模块 --输出--> SIRP

4.4 测试端到端流程

我们可以用curl模拟SIEM发送一个告警来测试整个链路:

curl -X POST http://localhost:5000/webhook/siem/bruteforce \ -H "Content-Type: application/json" \ -H "X-Webhook-Secret: YourWebhookSecretKey789!" \ -d '{ “event”: { “id”: “test_alert_001”, “src_ip”: “192.168.1.100”, “user”: “Administrator”, “count”: 15, “timestamp”: “2023-10-27T10:30:00Z” } }'

然后,依次检查:

  1. Webhook日志docker-compose logs webhook-receiver应该显示收到请求并转发。
  2. 模块引擎日志docker-compose logs module-engine应该显示bruteforce_ai_analyzer模块处理了一条告警,并打印出AI分析的结果和严重性。
  3. SIRP界面:登录SIRP,在“告警”或“事件”页面,应该能看到一条新的记录,标题类似“AI Analyzed Bruteforce: 192.168.1.100 -> Administrator”,并且包含了AI生成的摘要和建议。

如果一切顺利,你就成功搭建了一个AI驱动的自动化告警分析流水线!

5. 生产环境部署的进阶考量与避坑指南

在测试环境跑通只是第一步。要将ASP用于生产,还需要考虑更多因素。以下是我在实际部署中积累的一些关键经验和常见问题的解决方案。

5.1 性能、高可用与监控

  • 性能调优

    • Redis:生产环境务必为Redis配置持久化(AOF或RDB),并分配足够内存。如果告警量巨大,考虑使用Redis Cluster。docker-compose.yml中的默认配置可能不够。
    • 模块引擎module-engine默认是单进程。对于CPU密集型的模块(如复杂的AI推理),可以考虑启动多个实例,并利用Redis Stream的消费者组特性实现并行处理。这需要修改部署配置,可能要用到Kubernetes或更复杂的Docker Compose编排。
    • 数据库:PostgreSQL是SIRP的后端。需要根据事件数量定期进行数据库维护(如清理旧数据、建立索引)。SIRP的管理界面可能自带一些维护任务。
  • 高可用:核心在于无状态服务的多实例部署和有状态服务的备份。

    • Webhook ReceiverModule Engine是无状态的,可以水平扩展,前面用负载均衡器(如Nginx)分发流量。
    • RedisPostgreSQL是有状态的,需要部署主从复制或集群方案。对于中小规模,可以使用云服务的托管数据库,它们通常内置了高可用。
    • SIRP Web:也可以多实例部署,共享同一个数据库和Redis。
  • 监控与日志

    • 应用日志:ASP各组件都输出标准日志到stdout,被Docker捕获。生产环境应该使用docker-compose的日志驱动,将日志集中发送到ELK、Loki等日志平台。
    • 业务指标:需要自己埋点或通过外部监控。关键指标包括:Webhook接收速率、各Stream的消息堆积数、各模块的处理延迟和错误率、SIRP的API响应时间。
    • 健康检查:Docker Compose文件应该为每个服务配置healthcheck,以便在容器不健康时自动重启或告警。

5.2 安全加固配置清单

开源项目默认配置往往不适用于生产。以下安全措施必须考虑:

  1. 网络隔离

    • 将ASP的所有服务部署在一个独立的Docker网络或Kubernetes命名空间中。
    • 严格限制暴露的端口。通常只将SIRP Web的端口(如443,通过反向代理)和Webhook Receiver的端口(如5000)暴露给内部网络或特定的SIEM IP地址段。切勿将管理端口(如PostgreSQL的5432,Redis的6379)暴露给公网
    • 在Webhook Receiver前部署反向代理(如Nginx),并配置TLS终止、速率限制和基于IP的访问控制列表。
  2. 认证与授权

    • Webhook:必须使用WEBHOOK_SECRET_KEY,并在所有Webhook请求头中验证。这是防止恶意数据注入的第一道防线。
    • SIRP:充分利用其内置的用户角色权限系统。不要所有人都用超级管理员账号。为分析师、只读用户等创建不同角色的账号。
    • API密钥:所有AI服务、威胁情报API的密钥都必须通过环境变量或密钥管理服务注入,绝不能硬编码在配置文件或代码中。
  3. 数据安全

    • 定期备份PostgreSQL数据库和Redis的持久化文件。
    • 考虑对数据库中的敏感字段(如原始告警中的密码哈希、个人信息)进行加密存储。
    • 制定数据保留策略,定期归档或清理过期的告警和事件数据。

5.3 常见问题与排查实录

即使按照指南操作,你也可能会遇到一些问题。这里记录几个我踩过的“坑”:

  • 问题一:Module Engine启动失败,报错“ModuleNotFoundError: No module named ‘xxx’”

    • 原因:模块文件存在语法错误,或者模块依赖的Python包没有安装。
    • 排查
      1. 检查模块文件的Python语法:python -m py_compile your_module.py
      2. 查看module-engine容器的日志,错误信息通常会指出具体是哪一行。
      3. 确认你的模块是否在config/modules.yaml中正确注册,class的路径是否正确。
      4. 如果模块依赖第三方库,需要在Dockerfile中为module-engine镜像添加安装命令,或者通过pip install在容器启动时安装。
  • 问题二:Webhook发送成功,但SIRP里看不到事件

    • 原因:这是数据流中断的典型表现。需要分段排查。
    • 排查步骤
      1. 查Webhook接收器日志docker-compose logs webhook-receiver,确认请求是否被正确接收,状态码是否为200。如果返回4xx,检查路由配置和Secret验证。
      2. 查Redis:进入Redis容器,查看目标Stream是否有新消息:XREAD COUNT 1 STREAMS alerts 0
      3. 查Module Engine日志:确认对应的模块是否被触发,处理过程中是否有异常。模块处理后的输出会发送到SIRP的API,检查这里是否有网络或认证错误。
      4. 查SIRP API日志:SIRP的Web服务日志可能会记录创建事件时遇到的错误,比如数据格式不符合预期。
  • 问题三:AI模块响应缓慢,导致Stream消息堆积

    • 原因:调用外部LLM API(尤其是GPT-4)或本地大模型推理速度慢,成为性能瓶颈。
    • 解决方案
      1. 异步处理:确保模块的process方法是async的,并且内部调用AI API时使用了异步客户端(如httpx.AsyncClientopenai.AsyncOpenAI)。这样可以在等待AI响应的同时处理其他消息。
      2. 批量处理:修改模块逻辑,一次从Stream中读取多条消息(XREADCOUNT参数),然后批量发送给AI API(如果API支持),可以显著提高吞吐量。
      3. 缓存:对于重复出现的源IP、哈希值等,可以将AI分析结果缓存到Redis中一段时间,避免对相同指标重复分析。
      4. 降级方案:在AI服务不可用或超时时,模块应能自动降级,使用基于规则的简单分析,保证流程不中断。
  • 问题四:自定义模块的配置不生效

    • 原因config/modules.yaml中的配置没有正确传递给模块,或者模块代码中读取配置的方式不对。
    • 检查
      1. 模块的__init__方法是否正确接收了config参数,并调用super().__init__(config)
      2. 在模块内部,通过self.config.get(‘key’)来读取配置。
      3. 确保YAML文件语法正确,缩进无误。可以使用在线YAML校验器检查。
      4. 重启module-engine服务使配置生效。

6. 从自动化到智能化:构建复杂响应剧本

模块处理了告警,创建了事件。接下来,安全分析师在SIRP中查看事件时,可以手动触发更复杂的自动化操作,这就是“剧本”的用武之地。剧本比模块更侧重于交互式多步骤的响应动作。

6.1 剧本的概念与编写

一个剧本本质上是一个Python脚本,它可以通过SIRP的UI被触发,并作用于一个具体的对象(事件、告警、资产等)。剧本可以执行诸如“隔离主机”、“查询全量威胁情报”、“生成处置报告并发送邮件”等操作。

剧本存放在playbooks/目录下。我们编写一个简单的剧本,用于对事件中的可疑IP进行全面的威胁情报查询。

创建playbooks/enrich_ip_threat_intel.py:

#!/usr/bin/env python3 import requests import json from typing import Dict, Any, List # 剧本的元信息,用于在SIRP UI中显示 METADATA = { “name”: “IP威胁情报富化”, “description”: “对事件中的IP地址进行多源威胁情报查询。”, “author”: “Your Team”, “version”: “1.0”, “supported_object_types”: [“case”, “alert”] # 可以针对哪些类型的对象触发 } # 需要用户输入的参数(会在SIRP UI中显示为表单) INPUT_SCHEMA = [ { “name”: “ip_address”, “type”: “string”, “description”: “要查询的IP地址”, “required”: True }, { “name”: “intel_sources”, “type”: “multiselect”, “description”: “选择要查询的情报源”, “options”: [“AbuseIPDB”, “VirusTotal”, “Shodan”, “GreyNoise”], “default”: [“AbuseIPDB”, “VirusTotal”], “required”: True } ] def run(playbook_input: Dict[str, Any], sirp_api_client, context: Dict[str, Any]) -> Dict[str, Any]: """ 剧本的主函数。 :param playbook_input: 用户通过UI输入的表单数据 :param sirp_api_client: 一个预配置好的客户端,用于与SIRP API交互(如更新事件) :param context: 上下文信息,如触发剧本的对象ID、类型等 :return: 一个结果字典,会显示在SIRP的剧本执行结果中 """ ip = playbook_input.get(“ip_address”) sources = playbook_input.get(“intel_sources”, []) if not ip: return {“success”: False, “error”: “未提供IP地址”} results = {“ip”: ip, “sources”: {}} # 1. 查询AbuseIPDB (示例) if “AbuseIPDB” in sources: abuseipdb_result = query_abuseipdb(ip) results[“sources”][“AbuseIPDB”] = abuseipdb_result # 2. 查询VirusTotal (示例) if “VirusTotal” in sources: vt_result = query_virustotal(ip) results[“sources”][“VirusTotal”] = vt_result # 3. 将结果作为“证据”添加到SIRP事件中 object_id = context.get(“object_id”) object_type = context.get(“object_type”) if object_id and object_type: artifact_data = { “type”: “threat_intel_report”, “data”: results, “description”: f”IP {ip} 的多源威胁情报查询结果”, “tags”: [“playbook”, “threat_intel”, “ip_enrichment”] } # 使用API客户端添加证据 try: sirp_api_client.create_artifact(object_type, object_id, artifact_data) results[“sirp_artifact_created”] = True except Exception as e: results[“sirp_artifact_error”] = str(e) # 4. 总结判断 malicious_indicators = 0 for source, data in results[“sources”].items(): if data.get(“is_malicious”): malicious_indicators += 1 results[“summary”] = { “is_suspicious”: malicious_indicators > 0, “malicious_source_count”: malicious_indicators, “recommendation”: “建议封锁此IP” if malicious_indicators > 1 else “建议持续监控” } return {“success”: True, “results”: results} def query_abuseipdb(ip: str) -> Dict[str, Any]: """模拟查询AbuseIPDB API""" # 实际使用时,需要配置API Key并从环境变量读取 api_key = os.getenv(“ABUSEIPDB_API_KEY”) if not api_key: return {“error”: “API key not configured”, “is_malicious”: False} # 这里是伪代码,实际需要调用requests库 # response = requests.get(f“https://api.abuseipdb.com/api/v2/check?ipAddress={ip}”, headers={...}) # return parse_response(response) # 模拟返回 return { “abuseConfidenceScore”: 85, “totalReports”: 12, “is_malicious”: True, “lastReportedAt”: “2023-10-26T08:30:00Z” } def query_virustotal(ip: str) -> Dict[str, Any]: """模拟查询VirusTotal API""" # 类似AbuseIPDB,需要API Key和实际调用 return { “harmless_votes”: 5, “malicious_votes”: 8, “suspicious_votes”: 2, “is_malicious”: True, “detected_urls”: [“http://malicious-site.com/”] }

6.2 在SIRP中配置与触发剧本

剧本写好后,需要在SIRP的管理界面进行注册和配置。

  1. 放置剧本文件:将enrich_ip_threat_intel.py放到ASP容器内的/app/playbooks目录(通常通过Docker volume映射)。

  2. 重启PlaybookLoader服务docker-compose restart playbook-loader,使其加载新剧本。

  3. 在SIRP UI中配置

    • 以管理员身份登录SIRP。
    • 进入“设置”或“剧本管理”区域。
    • 应该能看到系统自动扫描到的“IP威胁情报富化”剧本。
    • 点击配置,可能需要输入各个情报源的API密钥(这些密钥可以存储在SIRP的密钥管理器中,剧本通过sirp_api_client或环境变量获取)。
    • 启用该剧本,并分配可以执行它的用户角色。
  4. 触发剧本

    • 分析师在SIRP中打开一个具体的事件或告警。
    • 在详情页的“操作”或“剧本”选项卡中,会看到可用的剧本列表。
    • 选择“IP威胁情报富化”,会弹出表单让你输入要查询的IP(通常可以自动填充当前事件中的IP),选择情报源。
    • 点击执行,剧本会在后台运行。执行完成后,结果会显示在页面上,并且生成的新证据(Threat Intel Report)会自动关联到该事件。

通过这种方式,安全团队可以将那些需要多步骤、调用多个外部API的复杂调查动作,沉淀为可重复使用的“剧本”,极大提升事件响应的效率和一致性。

http://www.jsqmd.com/news/787335/

相关文章:

  • AI驱动项目规划平台:从自然语言到可执行计划的智能拆解
  • gentoo安装linuxwallpaperengine
  • MIPS32 34K多线程处理器架构与优化解析
  • 命令行交互革命:用Rust TUI工具cliclaw提升终端效率
  • Python轻量级定时任务库timetask:原理、实战与选型指南
  • 数据智能体分级框架与L2级实战:从概念到工程落地
  • 开源硬件徽章设计:从ESP32/RP2040选型到LED驱动与功耗管理实战
  • 法律领域可论证AI:从可解释到可信推理的工程实践
  • 多智能体开发环境配置实战:从环境即代码到团队协作
  • CANN DeepSeek-V3.2-Exp PyPTO融合算子开发
  • 多机器人协作运输系统的强化学习实现与优化
  • 053、BLDC有感控制与无感控制
  • Minecraft服务器网关Gateward:提升稳定性与安全性的现代化代理方案
  • 基于AWS Bedrock与OpenSearch构建企业级RAG智能问答系统
  • PromptCraft-Robotics:用大语言模型与提示工程控制机器人仿真
  • ailia-models:跨平台AI模型推理库与预训练模型仓库实战指南
  • mcp-use:统一工具管理与工作流编排的模块化平台实践
  • 2026年4月国内热门的扫描仪生产厂家推荐,智能扫描系统/高精度平面扫描仪/刀模扫描仪/玻璃扫描仪,扫描仪定制厂家有哪些 - 品牌推荐师
  • 054、反电动势检测与无感控制
  • Cursor AI编程助手成本计算器:开源工具精准估算Token开销
  • 脑机接口可解释AI:从黑箱到透明决策的技术实现与应用挑战
  • 2026AI大模型API中转服务全网实测:多维度评测,为企业与开发者提供精准选型参考
  • 基于MCP协议构建金融数据服务器:AI Agent与量化分析实践
  • AI模型公平性挑战与缓解策略:从数据偏见到算法公正
  • GPT-4o图像生成实战:从提示词工程到多模态创作全解析
  • 055 步进电机控制:整步、半步、细分
  • 目标导向DNN分割:实现边缘AI低能耗推理的动态聚焦技术
  • KnowLM开源框架:知识增强大模型在信息抽取与对话中的实践指南
  • 怎么在 Node.js 环境下实现 DeepSeek 接口的 SSE 流式响应接收
  • 物理信息AI与神经拉格朗日大涡模拟:CFD湍流建模新范式