从零构建CobaltStrike流量解密工具:实战AES与RSA密钥提取
1. 项目概述与核心价值
最近在分析一些威胁流量时,CobaltStrike的加密通信总是绕不开的坎。作为一个在红蓝对抗和威胁狩猎领域摸爬滚打了十多年的老手,我深知直接解密这些流量对于理解攻击者意图、提取IoC(入侵指标)乃至溯源的重要性。市面上虽然有一些现成的工具,比如Didier Stevens大佬的1768.py和cs-decrypt-metadata.py,但它们更像是“黑盒”或者特定场景下的解决方案。当你遇到一个非标准端口、自定义的元数据格式,或者只是想深入理解Beacon与C2服务器之间那层加密外壳究竟是如何构建和剥开的,自己动手从零构建一个解密工具就成了必经之路。
这个项目,“从零构建CobaltStrike流量解密工具:实战AES与RSA密钥提取”,核心目标就是带你穿透迷雾,亲手实现这个解密过程。它不仅仅是调用几个库函数那么简单,而是深入CobaltStrike Beacon的通信协议骨髓,理解其如何利用RSA非对称加密来安全传递AES对称加密的密钥,最终实现流量的明文还原。这对于安全分析师、逆向工程师和威胁情报研究人员来说,是一项极其硬核且实用的技能。通过这个项目,你将能透彻掌握从网络流量(PCAP)中识别Beacon通信、提取加密载荷、逆向RSA公钥、解密元数据获取AES密钥,直至最终解密所有C2指令和回传数据的完整链条。你会发现,一旦掌握了原理,那些看似牢不可破的加密流量,其实有迹可循。
2. CobaltStrike Beacon通信协议深度解析
要解密流量,首先必须成为协议的“知情人”。CobaltStrike Beacon与其团队服务器(Team Server)之间的通信,是一套设计精巧的加密握手与数据传输机制。
2.1 通信流程与加密框架
Beacon的通信主要分为两个阶段:元数据交换和加密会话通信。整个流程可以类比为一个需要双重锁具的保险箱运送过程。
初始上线(元数据交换):
- Beacon在受害主机上首次启动时,会生成一个随机的AES密钥(我们称之为
Session Key)。这个密钥将是后续所有通信加密解密的唯一对称密钥。 - Beacon会收集当前系统的一些信息,如计算机名、用户名、进程ID、IP地址等,组合成一个元数据(Metadata)块。
- 关键一步来了:Beacon使用内置的或从C2服务器首次响应中获取的RSA公钥,对这个
元数据块进行加密。加密后的数据通常会被编码(如Base64)后,作为HTTP Cookie(默认名称为Cookie)或POST数据的一部分,发送给C2服务器。 - C2服务器持有对应的RSA私钥,它可以解密这个Cookie,从而获得Beacon的元数据以及最重要的——那个随机生成的
AES Session Key。至此,服务器和Beacon共享了同一个对称密钥。
- Beacon在受害主机上首次启动时,会生成一个随机的AES密钥(我们称之为
后续心跳与任务通信(加密会话):
- 在此之后的所有通信(包括Beacon的心跳GET请求、服务器下发的任务指令、Beacon返回的任务结果),其**载荷(Payload)**部分都会使用上一步共享的
AES Session Key进行加密。 - 加密模式通常为AES-256-CBC。每次加密都会使用一个随机的初始化向量(IV),这个IV会预置在加密数据块的前面,一起传输。
- 因此,在网络上捕获的流量,除了最初的元数据交换包,其余的数据包内容看起来都是毫无规律的二进制乱码。
- 在此之后的所有通信(包括Beacon的心跳GET请求、服务器下发的任务指令、Beacon返回的任务结果),其**载荷(Payload)**部分都会使用上一步共享的
这个设计的精妙之处在于:利用RSA非对称加密的安全性来安全传递对称密钥,再利用AES对称加密的高效性来处理大量的后续通信数据。攻击者的RSA私钥(存在于团队服务器的.cobaltstrike.beacon_keys文件中)是这个信任链的根。如果私钥泄露,整个通信链即可被解密。
2.2 密钥文件(.cobaltstrike.beacon_keys)的结构
这个文件是解密的“金钥匙”。它是一个Java序列化对象文件,通常包含以下核心信息:
- RSA公钥(Public Key):以X.509格式存储,Beacon用它来加密元数据。
- RSA私钥(Private Key):以PKCS#8格式存储,团队服务器用它来解密元数据。这也是我们作为分析者梦寐以求的东西。
- 密钥的哈希值:用于校验。
在破解版CobaltStrike泛滥的背景下,一个严重的安全问题出现了:许多攻击者直接使用破解版自带的、相同的.beacon_keys文件。这就导致了一个惊人的事实——互联网上相当一部分CobaltStrike服务器使用的RSA密钥对是重复的。安全研究人员通过扫描,已经收集到了这些常见的“默认”私钥。这意味着,即使你没有捕获到最初的握手包,只要流量来自使用这些常见密钥对的服务器,你依然可以直接用已知私钥进行解密尝试。我们的工具需要兼顾这两种情况:使用从流量中提取的公钥进行推导(或碰撞),以及支持直接使用已知的私钥列表进行快速解密。
3. 工具整体设计与模块拆解
我们的解密工具不会是一个庞然大物,而是一个由几个功能清晰、可独立也可协同的Python脚本组成的工具箱。核心思路是模块化,便于调试和扩展。
3.1 核心模块规划
流量解析与提取模块(
pcap_parser.py):- 输入:包含CobaltStrike流量的PCAP/PCAPNG文件。
- 功能:
- 使用
pyshark或scapy库解析数据包。 - 通过特征(如特定URI路径
/submit.php,/.a,/pixel等,或特定的User-Agent)过滤出疑似CobaltStrike HTTP/HTTPS流量。 - 识别并提取两个关键部分:
- 元数据Cookie:从HTTP请求头中提取
Cookie字段的值(通常是Base64编码的RSA加密数据)。 - 加密载荷:从HTTP POST请求体或GET/POST的特定参数中提取加密的二进制数据。
- 元数据Cookie:从HTTP请求头中提取
- 使用
- 输出:结构化的会话列表,包含每个会话的元数据密文、加密载荷数组、源/目的IP和端口。
RSA密钥处理模块(
rsa_utils.py):- 功能:
- 公钥提取与解析:从Beacon配置(使用
1768.py等工具从Beacon二进制文件中提取)或从网络流量中间接获取公钥。如果只有公钥,我们可以将其用于加密测试或与其他信息结合分析。 - 私钥管理:维护一个已知私钥的数据库(可以从公开研究如NVISO的报告、开源项目
CobaltStrikeParser等获取)。支持加载PEM格式的私钥文件。 - 元数据解密:使用RSA私钥解密提取到的元数据Cookie。解密后得到原始数据,其中包含AES Session Key和系统信息。
- 公钥提取与解析:从Beacon配置(使用
- 输出:解密后的元数据明文,特别是那个32字节(256位)的AES密钥。
- 功能:
AES流量解密模块(
aes_decryptor.py):- 功能:
- 接收从元数据中解密得到的AES密钥。
- 解析加密载荷:识别出载荷前16字节(或特定长度)的初始化向量(IV)。
- 使用AES-256-CBC模式,用提供的密钥和IV,对剩余的载荷数据进行解密。
- 解密后的数据通常是另一个层级的编码(如Base64)或压缩(如Gzip),需要进一步处理才能得到可读的指令或结果。
- 输出:解密后的原始指令数据(可能是Shell命令、文件数据、任务结果等)。
- 功能:
配置提取辅助模块(依赖
1768.py):- 我们不会重造轮子。Didier Stevens的
1768.py是一个极其优秀的Beacon配置提取工具。我们的工具可以集成调用它,或者借鉴其思路。 - 它可以直接从Beacon的二进制文件(.exe, .dll, shellcode)中提取出C2服务器地址、端口、通信路径、睡眠时间等配置,并且能直接判断该Beacon使用的RSA密钥是否为已知私钥。这能为我们提供至关重要的起点信息。
- 我们不会重造轮子。Didier Stevens的
3.2 工具工作流
整个工具链的工作流程如下:
输入PCAP文件 ↓ [流量解析模块] -> 提取出元数据Cookie和加密载荷列表 ↓ |--- (路径A: 有已知私钥) ---| | ↓ | [RSA密钥模块]使用已知私钥解密元数据 -> 获得AES密钥 | | |--- (路径B: 需从Beacon文件提取) ---| | ↓ | 使用`1768.py`分析Beacon样本 -> 获得配置及密钥提示 | | |--- (路径C: 只有流量和公钥) ---| | ↓ | 尝试使用公开的常见私钥进行碰撞解密 | ↓ 获得有效的AES Session Key ↓ [AES解密模块]对每一个加密载荷进行解密 ↓ 输出结构化的解密结果(JSON或控制台打印)实操心得:在实际的威胁狩猎中,路径A(已知私钥)和路径B(从捕获的样本提取)是最常见的。路径C(仅公钥碰撞)更像是一种普查或研究行为。因此,构建一个可扩展的已知私钥库是提升工具实战效率的关键。
4. 核心环节实战实现
让我们抛开理论,直接进入代码实战环节。我会用Python作为实现语言,因为它拥有丰富的密码学和网络库。
4.1 实战环境准备与依赖安装
首先,确保你的Python环境在3.8以上。我们将使用以下核心库:
pip install pyshark cryptography requestspyshark:一个基于TShark(Wireshark的命令行工具)的封装,用于解析PCAP文件,比纯Scapy更省内存且功能强大。cryptography:一个功能全面、安全的密码学库,我们将用它进行RSA和AES加解密操作。绝对不要使用已弃用的pycrypto库。requests:可选,用于如果需要从网络下载已知密钥库。
此外,你还需要系统安装Wireshark或至少安装tshark命令行工具,因为pyshark依赖它。
4.2 模块一:PCAP流量解析器实现
我们创建一个cs_pcap_parser.py文件。
import pyshark import base64 from typing import List, Dict, Optional class CobaltStrikePCAPParser: """ 解析PCAP文件,提取CobaltStrike的元数据Cookie和加密载荷。 """ def __init__(self, pcap_path: str): self.pcap_path = pcap_path self.sessions = [] # 存储提取到的会话信息 def extract_by_http_cookie(self, display_filter: str = 'http') -> List[Dict]: """ 通过HTTP Cookie特征提取。 display_filter: pyshark的显示过滤器,可初步缩小范围,如 'http and ip.addr == x.x.x.x' """ cap = pyshark.FileCapture(self.pcap_path, display_filter=display_filter) sessions_map = {} for pkt in cap: try: if hasattr(pkt, 'http'): http_layer = pkt.http src_ip = pkt.ip.src dst_ip = pkt.ip.dst session_key = f"{src_ip}:{pkt[pkt.transport_layer].srcport} -> {dst_ip}:{pkt[pkt.transport_layer].dstport}" # 1. 提取元数据Cookie (通常来自GET请求的Cookie头) if hasattr(http_layer, 'cookie') and http_layer.cookie: # CobaltStrike的Cookie值通常是一长串Base64 cookie_value = http_layer.cookie # 简单启发式:长度较长且包含常见Base64字符 if len(cookie_value) > 50 and all(c.isalnum() or c in '+/=' for c in cookie_value): if session_key not in sessions_map: sessions_map[session_key] = { 'src_ip': src_ip, 'dst_ip': dst_ip, 'metadata_cookie': None, 'encrypted_payloads': [] } sessions_map[session_key]['metadata_cookie'] = cookie_value.strip() # 2. 提取加密载荷 (通常来自POST请求体,或GET的特定参数) # 检查POST数据或长的GET参数 encrypted_data = None if hasattr(http_layer, 'file_data'): # 可能是POST的二进制数据 raw_data = http_layer.file_data.binary_value if raw_data and len(raw_data) > 16: # 假设大于16字节才可能是加密载荷 encrypted_data = raw_data # 你也可以检查特定的URI路径,如/submit.php, /pixel, /.a等 # if hasattr(http_layer, 'request_uri') and '/submit.php' in http_layer.request_uri: if encrypted_data: if session_key not in sessions_map: sessions_map[session_key] = { 'src_ip': src_ip, 'dst_ip': dst_ip, 'metadata_cookie': None, 'encrypted_payloads': [] } sessions_map[session_key]['encrypted_payloads'].append({ 'frame_num': pkt.number, 'data': encrypted_data }) except AttributeError: # 忽略没有HTTP层的包 continue except Exception as e: print(f"Error processing packet {pkt.number}: {e}") continue cap.close() self.sessions = list(sessions_map.values()) return self.sessions def print_sessions(self): """打印提取到的会话信息""" for i, sess in enumerate(self.sessions): print(f"\n--- Session {i+1} ---") print(f"Client: {sess['src_ip']} -> Server: {sess['dst_ip']}") print(f"Metadata Cookie (Base64): {sess['metadata_cookie'][:80]}..." if sess['metadata_cookie'] else "No Metadata Cookie found.") print(f"Number of Encrypted Payloads: {len(sess['encrypted_payloads'])}")注意事项:
pyshark在解析大型PCAP文件时可能较慢。在生产环境中,对于超大文件,可以考虑先用tshark命令行工具预处理提取出关键字段,再用Python处理结果。例如:tshark -r traffic.pcap -Y "http" -T fields -e ip.src -e tcp.srcport -e ip.dst -e tcp.dstport -e http.cookie -e http.file_data。
4.3 模块二:RSA密钥处理与元数据解密
创建cs_rsa_decryptor.py。这里我们假设你已经通过某种方式(如1768.py分析样本,或从公开资源获取)得到了一个PEM格式的RSA私钥。
from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import padding from cryptography.hazmat.primitives import hashes from cryptography.hazmat.backends import default_backend import base64 class MetadataDecryptor: def __init__(self, private_key_pem: str = None, private_key_path: str = None): """ 初始化解密器。 :param private_key_pem: PEM格式的私钥字符串 :param private_key_path: PEM私钥文件路径 """ self.private_key = None if private_key_pem: self.private_key = serialization.load_pem_private_key( private_key_pem.encode(), password=None, backend=default_backend() ) elif private_key_path: with open(private_key_path, 'rb') as f: self.private_key = serialization.load_pem_private_key( f.read(), password=None, backend=default_backend() ) else: # 可以在这里初始化一个已知私钥的列表进行尝试 self.known_keys = self._load_known_keys() def _load_known_keys(self): """加载内置的已知私钥列表(示例,实际需要填充)""" known_keys = {} # 示例:这是一个占位符。实际应从文件或资源加载。 # known_keys['key_hash_here'] = '-----BEGIN PRIVATE KEY-----\n...\n-----END PRIVATE KEY-----' return known_keys def decrypt_metadata(self, encrypted_b64_cookie: str) -> Dict: """ 解密Base64编码的元数据Cookie。 :param encrypted_b64_cookie: 从流量中提取的Base64字符串 :return: 解密后的元数据字典,包含AES key等 """ if not self.private_key: raise ValueError("No private key provided for decryption.") try: # 1. Base64解码 encrypted_data = base64.b64decode(encrypted_b64_cookie) # 2. RSA解密 (使用PKCS1v15填充,这是CobaltStrike默认使用的) decrypted_data = self.private_key.decrypt( encrypted_data, padding.PKCS1v15() ) # 3. 解析解密后的数据 # CobaltStrike的元数据格式通常是: [4字节长度][AES Key][其他系统信息...] # 前4字节是小端序的元数据总长度 total_len = int.from_bytes(decrypted_data[:4], byteorder='little') # AES Key通常是接下来的32字节 aes_key = decrypted_data[4:36].hex() # 转换为16进制字符串表示 # 剩余的是系统信息,通常是UTF-16LE编码的字符串,以null分隔 sys_info_raw = decrypted_data[36:total_len] # 尝试解析系统信息(这是一个简化版,实际格式可能更复杂) sys_info = self._parse_system_info(sys_info_raw) return { 'aes_key_hex': aes_key, 'aes_key_bytes': decrypted_data[4:36], 'total_length': total_len, 'system_info': sys_info, 'raw_decrypted': decrypted_data } except Exception as e: print(f"解密元数据失败: {e}") # 可能是填充方式不对,或者是公钥不匹配 # 可以尝试其他填充方式,如OAEP,但CobaltStrike默认是PKCS1v15 return None def _parse_system_info(self, data: bytes) -> Dict: """尝试解析系统信息字段。这是一个复杂且版本相关的部分。""" info = {} try: # 常见格式:计算机名\x00用户名\x00进程ID\x00... # 使用UTF-16LE解码 decoded_str = data.decode('utf-16le', errors='ignore') parts = decoded_str.split('\x00') if len(parts) >= 3: info['computer_name'] = parts[0] info['user_name'] = parts[1] info['process_id'] = parts[2] # 可能还有更多字段... except: info['raw_hex'] = data.hex() return info def try_decrypt_with_known_keys(self, encrypted_b64_cookie: str): """如果未指定私钥,尝试使用已知私钥库进行解密""" for key_name, key_pem in self.known_keys.items(): try: self.private_key = serialization.load_pem_private_key( key_pem.encode(), password=None, backend=default_backend() ) result = self.decrypt_metadata(encrypted_b64_cookie) if result: print(f"[+] Successfully decrypted with known key: {key_name}") return result except Exception: continue print("[-] Failed to decrypt with any known key.") return None核心细节:
cryptography库的decrypt方法要求输入的长度必须与密钥长度匹配。CobaltStrike使用2048位RSA密钥,加密后的数据长度是256字节。我们提取的Base64 Cookie解码后也应该是256字节。如果不是,可能需要检查Base64解码是否正确,或者数据是否被截断/修改。
4.4 模块三:AES-CBC流量解密器实现
创建cs_aes_decryptor.py。一旦我们拿到了AES Session Key,解密后续流量就相对直接了。
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.backends import default_backend import gzip import base64 class AESCBCCryptor: def __init__(self, aes_key_hex: str): """ :param aes_key_hex: 从元数据中解密得到的32字节AES密钥的16进制字符串 """ self.key = bytes.fromhex(aes_key_hex) if len(self.key) not in [16, 24, 32]: raise ValueError(f"AES key must be 16, 24, or 32 bytes long, got {len(self.key)}") # CobaltStrike Beacon 默认使用 AES-256-CBC if len(self.key) != 32: print(f"[!] Warning: Key length is {len(self.key)*8}-bit, not 256-bit. Decryption may fail.") def decrypt_payload(self, encrypted_data: bytes) -> bytes: """ 解密一个加密载荷。 CobaltStrike的格式通常是: [16字节 IV] + [加密数据] """ if len(encrypted_data) < 16: raise ValueError("Encrypted data too short to contain IV.") iv = encrypted_data[:16] ciphertext = encrypted_data[16:] cipher = Cipher(algorithms.AES(self.key), modes.CBC(iv), backend=default_backend()) decryptor = cipher.decryptor() decrypted_padded = decryptor.update(ciphertext) + decryptor.finalize() # 去除PKCS#7填充 padding_len = decrypted_padded[-1] # 验证填充 if padding_len < 1 or padding_len > 16 or decrypted_padded[-padding_len:] != bytes([padding_len]) * padding_len: # 可能不是标准填充,或者解密失败,返回原始数据 print(f"[!] Warning: Invalid PKCS#7 padding detected. Padding byte: {padding_len}") # 尝试直接返回,可能数据本身未填充或使用了其他方式 return decrypted_padded return decrypted_padded[:-padding_len] def decrypt_and_decompress(self, encrypted_data: bytes): """ 解密并处理可能的后续编码/压缩。 CobaltStrike在AES加密后,数据可能还会进行Base64编码或Gzip压缩。 """ # 1. AES解密 decrypted = self.decrypt_payload(encrypted_data) # 2. 尝试Gzip解压 (常见于任务结果回传) try: decompressed = gzip.decompress(decrypted) print("[+] Payload was Gzip compressed.") return decompressed except gzip.BadGzipFile: # 3. 如果不是Gzip,尝试Base64解码 (常见于下载的文件数据) try: # 检查是否是ASCII可打印字符 if all(32 <= b <= 126 for b in decrypted[:100]): decoded = base64.b64decode(decrypted) print("[+] Payload was Base64 encoded after encryption.") return decoded else: # 4. 可能就是纯二进制数据(如Shellcode反射加载) return decrypted except Exception: return decrypted def batch_decrypt(self, encrypted_payloads: List[bytes]) -> List[Dict]: """批量解密多个载荷""" results = [] for i, payload in enumerate(encrypted_payloads): try: clear_data = self.decrypt_and_decompress(payload) # 尝试以UTF-8解码,如果是文本命令或结果 try: text_output = clear_data.decode('utf-8', errors='ignore') data_type = 'text' except: text_output = '' data_type = 'binary' results.append({ 'index': i, 'original_length': len(payload), 'decrypted_length': len(clear_data), 'data_type': data_type, 'preview': text_output[:200] if text_output else clear_data[:50].hex(), 'full_data': clear_data }) except Exception as e: results.append({ 'index': i, 'error': str(e), 'original_length': len(payload) }) return results避坑指南:AES-CBC解密最常见的坑在于填充(Padding)。CobaltStrike通常使用标准的PKCS#7填充。如果解密后去除填充失败,可能的原因有:1) AES密钥错误;2) IV提取错误(不是前16字节);3) 数据在传输中被损坏或截断;4) Beacon使用了非标准的配置。在编写解密逻辑时,一定要做好异常处理,并对解密后的数据进行合理性检查(例如,是否包含可读的ASCII字符串或预期的数据结构)。
5. 完整工具链集成与实战演示
现在,我们将三个模块组合起来,并模拟一个完整的实战分析流程。
假设我们有一个名为cobalt_traffic.pcap的文件,并且我们通过其他途径(比如从同一事件中提取的Beacon样本用1768.py分析)得知其使用的私钥是公开的已知密钥之一,保存在common_private_key.pem中。
我们创建一个主脚本cs_decrypt_tool.py:
#!/usr/bin/env python3 import sys from cs_pcap_parser import CobaltStrikePCAPParser from cs_rsa_decryptor import MetadataDecryptor from cs_aes_decryptor import AESCBCCryptor import json def main(pcap_file, private_key_file): print(f"[*] Starting analysis of {pcap_file}") # 1. 解析PCAP print("[*] Step 1: Parsing PCAP for CobaltStrike traffic...") parser = CobaltStrikePCAPParser(pcap_file) sessions = parser.extract_by_http_cookie('http') # 可以添加更精确的过滤器,如IP if not sessions: print("[-] No potential CobaltStrike HTTP sessions found.") # 可以尝试其他过滤器或特征 sessions = parser.extract_by_http_cookie('tcp.port == 8080') # 示例 print(f"[+] Found {len(sessions)} potential session(s).") # 2. 初始化RSA解密器 print(f"[*] Step 2: Loading private key from {private_key_file}...") decryptor = MetadataDecryptor(private_key_path=private_key_file) all_results = [] for i, sess in enumerate(sessions): print(f"\n[*] Processing Session {i+1}: {sess['src_ip']} -> {sess['dst_ip']}") result = {'session_info': sess, 'metadata': None, 'decrypted_payloads': []} # 3. 解密元数据 if sess['metadata_cookie']: print(f" [-] Found metadata cookie, length: {len(sess['metadata_cookie'])}") metadata = decryptor.decrypt_metadata(sess['metadata_cookie']) if metadata: result['metadata'] = metadata aes_key_hex = metadata['aes_key_hex'] print(f" [+] Metadata decrypted successfully!") print(f" AES Key: {aes_key_hex}") print(f" Computer Name: {metadata['system_info'].get('computer_name', 'N/A')}") print(f" User Name: {metadata['system_info'].get('user_name', 'N/A')}") # 4. 使用AES密钥解密所有载荷 if sess['encrypted_payloads'] and aes_key_hex: print(f" [-] Decrypting {len(sess['encrypted_payloads'])} encrypted payload(s)...") aes_cryptor = AESCBCCryptor(aes_key_hex) encrypted_data_list = [p['data'] for p in sess['encrypted_payloads']] decrypted_results = aes_cryptor.batch_decrypt(encrypted_data_list) for dr in decrypted_results: if 'error' not in dr: print(f" Payload {dr['index']}: {dr['data_type']}, Preview: {dr['preview']}") else: print(f" Payload {dr['index']}: ERROR - {dr['error']}") result['decrypted_payloads'] = decrypted_results else: print(f" [-] Failed to decrypt metadata with the provided key.") # 可以在这里触发尝试已知密钥库 # metadata = decryptor.try_decrypt_with_known_keys(sess['metadata_cookie']) else: print(f" [-] No metadata cookie found in this session. Cannot derive AES key.") # 如果没有元数据,但你有AES密钥(从其他途径),也可以直接解密载荷 all_results.append(result) # 5. 输出结果 output_file = 'decryption_results.json' with open(output_file, 'w') as f: json.dump(all_results, f, indent=2, default=str) # default=str处理bytes对象 print(f"\n[+] All done! Results saved to {output_file}") if __name__ == '__main__': if len(sys.argv) != 3: print(f"Usage: {sys.argv[0]} <path_to_pcap> <path_to_private_key.pem>") sys.exit(1) main(sys.argv[1], sys.argv[2])运行这个工具:
python cs_decrypt_tool.py cobalt_traffic.pcap common_private_key.pem如果一切顺利,你将得到一个decryption_results.json文件,里面包含了所有解密后的会话、元数据以及C2指令和回传数据。例如,你可能会看到解密后的数据包含"cmd /c whoami"、"beacon_tasklist"或文件传输的二进制内容。
6. 常见问题排查与进阶技巧
在实际操作中,你几乎一定会遇到各种问题。下面是我踩过坑后总结的排查清单和进阶思路。
6.1 问题排查速查表
| 问题现象 | 可能原因 | 排查步骤 |
|---|---|---|
| 无法从PCAP中找到元数据Cookie | 1. 流量不是HTTP协议(可能是HTTPS、DNS、SMB等)。 2. Beacon配置使用了自定义的Cookie名称或位置。 3. 元数据在首次通信后已清除。 | 1. 检查是否过滤了HTTPS (tls) 流量,需要解密TLS或关注证书。2. 检查所有HTTP请求头,寻找长的Base64字符串。 3. 尝试使用 1768.py分析捕获到的Beacon样本,确认通信配置。 |
| RSA解密元数据失败 | 1. 私钥不匹配(不是该流量使用的密钥对)。 2. 提取的Cookie数据不正确(编码问题、截断)。 3. 填充方式不对(CobaltStrike默认PKCS1v15)。 | 1. 确认私钥来源。使用1768.py检查Beacon样本是否标记为已知密钥。2. 验证Base64解码后的长度是否为256字节(2048位RSA)。 3. 尝试使用 OAEP填充方式(较少见)。 |
| AES解密后是乱码 | 1. AES密钥错误。 2. IV提取位置错误(不是载荷前16字节)。 3. 加密模式不是CBC(可能是其他模式,极罕见)。 4. 解密后的数据还需进一步处理(Gzip/Base64)。 | 1. 反复核对从元数据中解析出的AES密钥。 2. 确认载荷结构。可以尝试不同的偏移量提取IV。 3. 在 AESCBCCryptor.decrypt_payload方法中打印IV和密钥进行核对。4. 确保执行了 decrypt_and_decompress中的后续处理步骤。 |
| 解密出的指令不完整或格式奇怪 | 1. Beacon使用了“Chunked”传输模式,数据被分片。 2. 解密数据是序列化的Java或自定义任务对象。 | 1. 需要按照Beacon的协议重组分片数据。这需要更深入的逆向分析。 2. 对于常见任务类型(如文件下载、端口扫描),需要编写特定的解析器。参考开源项目 DissectCobaltStrike的解析逻辑。 |
| 工具运行速度极慢 | 使用pyshark实时解析大型PCAP。 | 1. 先用tshark导出关键字段到JSON或CSV,再用Python处理。2. 使用 scapy的PcapReader,但对于大型文件内存消耗大。 |
6.2 进阶技巧与扩展思路
自动化密钥库集成:将公开的已知CobaltStrike私钥(例如从NVISO报告、Malleable C2 Profiles仓库中收集的)集成到你的工具中。实现一个功能:当没有提供私钥时,自动遍历密钥库尝试解密元数据Cookie。这能极大提高对使用流行破解版C2流量的分析效率。
支持Malleable C2 Profile:高级攻击者会使用Malleable C2配置文件自定义通信的方方面面,包括URI路径、头部字段、数据编码方式等。你的解析器需要足够灵活,允许用户通过配置文件指定如何提取Cookie和载荷。例如,可以从Profile中读取
metadata段落的base64和header "Cookie"等设置。从内存镜像或进程转储中提取密钥:在应急响应中,你可能面对的是一个运行中的Beacon进程。此时,可以从进程内存中搜索RSA私钥或AES会话密钥。私钥在团队服务器内存中,而AES会话密钥存在于每个Beacon进程的内存中。使用Volatility或Rekall等内存取证工具,结合
yara规则扫描特定模式(如RSA密钥的ASN.1结构头0x30, 0x82...),可以实现在没有网络流量情况下的解密。解密HTTPS流量:如果C2通信使用了HTTPS,你需要服务器私钥或会话密钥来解密TLS。在红队演练或可控环境中,可以通过在客户端或服务器端导入自定义CA证书进行中间人解密。在真实威胁分析中,这非常困难,但有时可以从受害主机提取TLS会话密钥(如果系统配置了
SSLKEYLOGFILE环境变量),再在Wireshark中解密。与威胁情报平台联动:将解密出的C2地址、证书哈希、攻击者指令等IoC自动提交到内部或外部的威胁情报平台,实现自动化标注和狩猎。
构建这样一个工具的过程,本身就是对CobaltStrike最深入的学习。它迫使你去理解每一个字节的含义,每一种加密算法的应用场景。当你第一次成功解密出攻击者的whoami命令时,那种穿透迷雾、直击本质的成就感,是使用任何现成工具都无法比拟的。这个项目不仅给你一个实用的工具,更给你一套分析加密C2通信的方法论,让你在面对下一个未知的恶意软件家族时,能有章可循,从容应对。
