当前位置: 首页 > news >正文

多线程+asyncio端口扫描器

#!/usr/bin/env python3
"""
多线程+asyncio端口扫描器
结合了异步IO和多线程技术,实现高效的端口扫描功能
"""import asyncio
import socket
import threading
import time
import argparse
from concurrent.futures import ThreadPoolExecutor
from typing import List, Tuple, Dictclass AsyncPortScanner:"""异步端口扫描器类使用asyncio和ThreadPoolExecutor实现高效的并发端口扫描"""def __init__(self, target: str, start_port: int, end_port: int, max_threads: int = 100, timeout: float = 1.0):"""初始化端口扫描器Args:target: 目标IP地址或域名start_port: 起始端口号end_port: 结束端口号max_threads: 最大线程数timeout: 连接超时时间(秒)"""self.target = targetself.start_port = start_portself.end_port = end_portself.max_threads = max_threadsself.timeout = timeoutself.open_ports = []  # 存储发现的开放端口self.lock = threading.Lock()  # 线程锁,保护共享资源self.total_ports = end_port - start_port + 1self.scanned_ports = 0async def scan_port(self, port: int) -> bool:"""异步扫描单个端口Args:port: 要扫描的端口号Returns:bool: 端口是否开放"""try:# 使用asyncio的run_in_executor在线程池中执行同步的socket操作# 这样可以在不阻塞事件循环的情况下执行耗时的网络IO操作loop = asyncio.get_event_loop()with ThreadPoolExecutor(max_workers=1) as executor:result = await loop.run_in_executor(executor, self._sync_connect, port)return resultexcept Exception as e:return Falsedef _sync_connect(self, port: int) -> bool:"""同步连接测试方法在线程池中执行,测试指定端口是否开放Args:port: 要测试的端口号Returns:bool: 连接是否成功"""try:# 创建TCP socketsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)sock.settimeout(self.timeout)# connect_ex()方法返回0表示连接成功result = sock.connect_ex((self.target, port))sock.close()return result == 0except Exception:return Falseasync def scan_port_range_chunk(self, start: int, end: int) -> List[int]:"""扫描端口范围块将大范围的端口分成小块进行并发扫描Args:start: 起始端口end: 结束端口Returns:List[int]: 该端口范围内的开放端口列表"""tasks = []# 为每个端口创建异步任务for port in range(start, end + 1):task = asyncio.create_task(self.scan_port_with_update(port))tasks.append(task)# 并发执行所有端口扫描任务results = await asyncio.gather(*tasks)# 筛选出开放的端口open_ports = [start + i for i, is_open in enumerate(results) if is_open]return open_portsasync def scan_port_with_update(self, port: int) -> bool:"""扫描端口并更新进度显示在扫描过程中实时更新进度条和发现的开放端口Args:port: 要扫描的端口号Returns:bool: 端口是否开放"""is_open = await self.scan_port(port)# 使用线程锁保护共享变量的更新
        with self.lock:self.scanned_ports += 1progress = (self.scanned_ports / self.total_ports) * 100print(f"\r扫描进度: {self.scanned_ports}/{self.total_ports} ({progress:.1f}%)", end="")# 如果发现开放端口,立即显示if is_open:with self.lock:self.open_ports.append(port)print(f"\n[+] 发现开放端口: {port}")return is_openasync def run_scan(self) -> List[int]:"""运行完整的端口扫描流程Returns:List[int]: 所有发现的开放端口列表"""print(f"开始扫描 {self.target} 的端口 {self.start_port}-{self.end_port}")print(f"使用 {self.max_threads} 个线程,超时时间: {self.timeout}秒")start_time = time.time()# 将端口范围分成多个块,每个块由一个线程处理# 这种分块策略可以更好地利用系统资源chunk_size = max(1, self.total_ports // self.max_threads)tasks = []# 为每个端口块创建异步任务for i in range(self.start_port, self.end_port + 1, chunk_size):chunk_end = min(i + chunk_size - 1, self.end_port)task = asyncio.create_task(self.scan_port_range_chunk(i, chunk_end))tasks.append(task)# 等待所有端口块扫描完成chunk_results = await asyncio.gather(*tasks)# 合并所有块的结果for chunk_ports in chunk_results:self.open_ports.extend(chunk_ports)end_time = time.time()elapsed_time = end_time - start_time# 显示扫描统计信息print(f"\n\n扫描完成!")print(f"扫描耗时: {elapsed_time:.2f}秒")print(f"发现 {len(self.open_ports)} 个开放端口")return sorted(self.open_ports)class PortScannerService:"""多目标扫描服务类管理多个目标的并发扫描任务"""def __init__(self):"""初始化扫描服务"""self.scan_results: Dict[str, List[int]] = {}  # 存储所有目标的扫描结果
    async def scan_multiple_targets(self, targets: List[str], start_port: int, end_port: int,max_threads: int = 100) -> Dict[str, List[int]]:"""并发扫描多个目标Args:targets: 目标IP或域名列表start_port: 起始端口end_port: 结束端口max_threads: 每个目标的最大线程数Returns:Dict[str, List[int]]: 每个目标对应的开放端口字典"""print(f"开始扫描 {len(targets)} 个目标...")# 为每个目标创建独立的扫描器实例tasks = []for target in targets:scanner = AsyncPortScanner(target, start_port, end_port, max_threads)task = asyncio.create_task(scanner.run_scan())tasks.append((target, task))# 并发执行所有扫描任务for target, task in tasks:try:open_ports = await taskself.scan_results[target] = open_portsexcept Exception as e:print(f"扫描 {target} 时出错: {e}")self.scan_results[target] = []return self.scan_resultsdef display_results(self):"""格式化显示所有目标的扫描结果提供清晰的汇总报告"""print("\n" + "="*50)print("扫描结果汇总:")print("="*50)for target, ports in self.scan_results.items():print(f"\n目标: {target}")if ports:print(f"开放端口 ({len(ports)}个): {', '.join(map(str, ports))}")else:print("未发现开放端口")async def main():"""主函数处理命令行参数并启动端口扫描"""# 创建命令行参数解析器parser = argparse.ArgumentParser(description="多线程+asyncio端口扫描器")parser.add_argument("target", help="目标IP或域名")parser.add_argument("-s", "--start", type=int, default=1, help="起始端口 (默认: 1)")parser.add_argument("-e", "--end", type=int, default=1024, help="结束端口 (默认: 1024)")parser.add_argument("-t", "--threads", type=int, default=100, help="最大线程数 (默认: 100)")parser.add_argument("--timeout", type=float, default=1.0, help="连接超时时间(秒) (默认: 1.0)")# 解析命令行参数args = parser.parse_args()# 创建并配置扫描器scanner = AsyncPortScanner(target=args.target,start_port=args.start,end_port=args.end,max_threads=args.threads,timeout=args.timeout)# 运行扫描open_ports = await scanner.run_scan()# 显示最终结果if open_ports:print(f"\n{args.target} 的开放端口: {', '.join(map(str, open_ports))}")else:print(f"\n{args.target} 没有发现开放端口")if __name__ == "__main__":# 启动异步主程序asyncio.run(main())

 

#!/usr/bin/env python3
import asyncio
from port_scanner import PortScannerServiceasync def demo():"""演示多目标扫描功能"""service = PortScannerService()# 扫描多个目标targets = ["127.0.0.1", "localhost"]start_port = 1end_port = 1000max_threads = 50print(f"开始扫描 {len(targets)} 个目标的端口 {start_port}-{end_port}")results = await service.scan_multiple_targets(targets=targets,start_port=start_port,end_port=end_port,max_threads=max_threads)# 显示结果
    service.display_results()if __name__ == "__main__":asyncio.run(demo())

 

http://www.jsqmd.com/news/53044/

相关文章:

  • U635735 Treap=Tree+Heap
  • Docker客户端控制局域网服务器 - a-cool
  • 时序约束记录
  • U635732 木叶下
  • U635734 神机
  • 2025深圳粉末冶金展机构权威推荐榜单:2025青岛家博会‌/2025深圳跨境电商展‌/2025新加坡海鲜展源头机构精选
  • U635730 二叉树
  • 2025年宽幅等离子清洗机优质厂家权威推荐榜单:真空等离子清洗机/大气等离子清洗机/等离子体清洗机源头厂家精选
  • 深入解析:简单、高效且低成本的预训练、微调与服务,惠及大众基于 Ray 架构设计的覆盖大语言模型(LLM)完整生命周期的解决方案byzer-llm
  • CF1985G-D-Function
  • 2025 年义乌礼品定制厂家最新推荐榜,聚焦企业生产能力、服务水平与市场认可度多维度解析定制商务礼品 / 公司礼品定制 / 纪念品定制 / 定制伴手礼 / 企业礼品定制 / 客户礼品定制公司推荐
  • U636118 二叉搜索树
  • 2025年口碑好的四川种苗基地排名及采购参考
  • 2025 年义乌商务礼品厂家最新推荐榜,全链条能力与定制服务双维度深度解析商务伴手礼/商务礼品网/定制商务礼品/商务福利礼品/商务实用礼品公司推荐
  • egacy(传统) nftables(较新) 和后端ipvs iptables有关系吗
  • 2025 年透声膜厂家最新推荐榜,技术实力与市场口碑深度解析手机防水/MIC 防水/耳机防水/手表防水/摄像头防水/监控防水/无氟防水/ePTFE 防水透声膜公司推荐
  • KlineCharts对接股票k线数据 股票数据源API
  • 2025年抗气爆O形圈厂家权威推荐榜单:橡胶扶正器/V3级胶筒/震击器源头厂家精选
  • 2025年ai智能体推荐公司权威推荐榜单:智能体搜索‌/aigeo‌/AIGEO源头公司精选
  • 右击转到定义,f12会跳转到错误的方法上
  • 2025年企业内部知识库私有化部署服务商全景指南:选型必读——聚焦AI模型与Deepseek方案,贯通知识库与智能BI本地部署的技术演进与厂商矩阵
  • 2025年核心年核心方案商遴选指南:企业智能BI私有化部署厂商与AI知识库(含DeepSeek)部署方案商综合解析
  • 2025企业知识管理破局:AI知识库与智能BI私有化部署实战路径(含知识库部署服务商、AI知识库部署方案商、BI私有化部署方案商全景梳理)
  • [H3C/华三]Super VLAN技术简述与配置
  • 2025年工字钢弯管直销厂家权威推荐榜单:圆管弯管‌/铝型材弯管‌/中频热弯管源头厂家精选
  • 留学中介排名TOP10重磅发布,谁是申请服务标杆
  • 推荐几家ins推广公司,五家效果不错的ins营销服务商盘点
  • 2025企业智能BI与知识库本地化部署实力厂商全景透视:从BI私有化、AI知识库到DeepSeek专有方案,方案,谁在定义数据新基座?
  • 排名榜单重磅来袭,关注优质十大留学机构
  • 国际物流公司优选指南:国际物流主流企业综合对比分析