当前位置: 首页 > news >正文

asyncio+queue实现生产者消费者爬虫模型

在网络爬虫开发中,生产者 - 消费者模型是经典且高效的架构模式。它将 “任务生产(URL 采集)” 和 “任务消费(页面爬取)” 解耦,能有效控制并发、避免资源浪费。而 Python 的asyncio(异步 I/O)结合asyncio.Queue,可以打造出高性能的异步爬虫,相比传统多线程 / 多进程爬虫,异步模型在 IO 密集型的爬虫场景下效率提升显著。

一、核心原理

1. 生产者 - 消费者模型

  • 生产者:负责生成待爬取的 URL,将其放入队列中,是爬虫的 “任务源”;
  • 消费者:从队列中取出 URL,执行页面请求、数据解析等操作,是爬虫的 “执行端”;
  • 队列(asyncio.Queue):作为生产者和消费者之间的缓冲,解耦两者的执行节奏,控制并发数量,避免瞬间请求量过大导致的被封 IP 或服务器压力过高。

2. asyncio 的优势

asyncio是 Python 内置的异步编程框架,基于事件循环实现,无需创建大量线程 / 进程,仅通过单线程的协程切换就能处理大量 IO 操作(如网络请求),资源开销远低于多线程,适合爬虫这类高 IO 场景。

二、完整实现代码

以下是一个可直接运行的异步爬虫示例,以爬取某测试站点的页面标题为例,完整实现生产者 - 消费者模型:

python

运行

import asyncio import aiohttp from aiohttp import ClientTimeout from typing import List # 全局配置 MAX_CONCURRENT = 5 # 最大并发消费者数量 QUEUE_MAXSIZE = 10 # 队列最大容量,控制缓冲大小 BASE_URL = "https://httpbin.org/get?page={}" # 测试URL PAGE_RANGE = range(1, 21) # 待爬取的页面范围(1-20页) class AsyncCrawler: def __init__(self): # 初始化异步队列,设置最大容量 self.queue = asyncio.Queue(maxsize=QUEUE_MAXSIZE) # 存储爬取结果 self.results = [] async def producer(self, urls: List[str]): """生产者:将待爬取的URL放入队列""" for url in urls: await self.queue.put(url) print(f"生产者:已放入URL -> {url}") # 放入结束标记(数量等于消费者数量),通知消费者退出 for _ in range(MAX_CONCURRENT): await self.queue.put(None) async def consumer(self, session: aiohttp.ClientSession, consumer_id: int): """消费者:从队列取出URL并爬取""" while True: # 从队列获取URL(异步阻塞,直到有数据) url = await self.queue.get() # 检测结束标记 if url is None: print(f"消费者{consumer_id}:收到结束标记,退出") self.queue.task_done() break try: # 异步请求页面 async with session.get(url, timeout=ClientTimeout(total=10)) as response: if response.status == 200: # 解析响应数据(此处仅示例,可替换为实际解析逻辑) data = await response.json() page = data["args"]["page"] self.results.append(f"页面{page}爬取成功") print(f"消费者{consumer_id}:成功爬取 -> {url}") else: print(f"消费者{consumer_id}:爬取失败,状态码 -> {response.status}") except Exception as e: print(f"消费者{consumer_id}:爬取异常 -> {url},错误:{str(e)}") finally: # 标记任务完成(用于队列的join()方法) self.queue.task_done() async def run(self): """启动爬虫主流程""" # 1. 生成待爬取的URL列表 urls = [BASE_URL.format(page) for page in PAGE_RANGE] # 2. 创建异步HTTP会话(复用连接,提升性能) async with aiohttp.ClientSession() as session: # 3. 创建并启动生产者任务 producer_task = asyncio.create_task(self.producer(urls)) # 4. 创建并启动多个消费者任务 consumer_tasks = [] for i in range(MAX_CONCURRENT): task = asyncio.create_task(self.consumer(session, i+1)) consumer_tasks.append(task) # 5. 等待生产者任务完成 await producer_task # 6. 等待队列中所有任务处理完成 await self.queue.join() # 7. 等待所有消费者任务退出 await asyncio.gather(*consumer_tasks) # 8. 输出爬取结果 print("\n===== 爬取完成 =====") for res in self.results: print(res) print(f"总计成功爬取:{len(self.results)} 条数据") if __name__ == "__main__": # 运行异步爬虫 crawler = AsyncCrawler() asyncio.run(crawler.run())

三、代码核心解析

1. 队列初始化

self.queue = asyncio.Queue(maxsize=QUEUE_MAXSIZE)初始化异步队列,maxsize限制队列最大容量,避免生产者生产过快导致内存溢出。

2. 生产者逻辑

  • 遍历 URL 列表,通过await self.queue.put(url)将 URL 放入队列;
  • 生产完成后,放入与消费者数量相同的None作为结束标记,确保每个消费者都能收到退出信号。

3. 消费者逻辑

  • 循环从队列取 URL:url = await self.queue.get()(异步阻塞,无数据时等待);
  • 检测到None时退出循环,完成消费者任务;
  • 使用aiohttp.ClientSession发起异步 HTTP 请求,复用连接池提升效率;
  • 无论爬取成功 / 失败,最终调用self.queue.task_done()标记任务完成,供队列join()方法检测。

4. 主流程控制

  • asyncio.create_task()创建协程任务,实现生产者和多个消费者的并发执行;
  • await self.queue.join()阻塞直到队列中所有任务都被task_done()标记,确保所有 URL 都被处理;
  • asyncio.gather(*consumer_tasks)等待所有消费者任务正常退出。

四、关键优化点

  1. 连接复用:使用aiohttp.ClientSession而非每次请求创建新会话,减少 TCP 连接建立开销;
  2. 并发控制:通过MAX_CONCURRENT限制消费者数量,避免并发过高触发反爬;
  3. 异常处理:捕获请求过程中的异常,避免单个 URL 爬取失败导致消费者退出;
  4. 队列缓冲:设置QUEUE_MAXSIZE,当队列满时生产者会阻塞,实现生产消费的速率匹配。

五、适用场景与扩展

1. 适用场景

  • 高并发爬取大量网页(如电商商品页、新闻列表页);
  • 需控制爬取速率,避免目标网站反爬;
  • IO 密集型爬取任务(大部分时间消耗在网络请求上)。

2. 扩展方向

  • 去重机制:添加 URL 去重(如使用集合),避免重复爬取;
  • 重试机制:对失败的 URL 进行重试,提升爬取成功率;
  • 数据存储:将爬取结果写入数据库(如 MySQL、MongoDB),需使用异步数据库驱动;
  • 代理池集成:添加异步代理池,随机切换代理 IP,避免被封;
  • 任务持久化:将队列中的 URL 持久化到文件 / 数据库,实现断点续爬。

总结

  1. asyncio + Queue实现的生产者 - 消费者爬虫,核心是通过异步队列解耦生产(URL 生成)和消费(页面爬取),利用异步 IO 提升爬取效率;
  2. 关键控制点包括:并发数限制、队列容量限制、异常处理、结束标记传递;
  3. 该模型兼具高性能和可控性,是异步爬虫开发的经典架构,可根据实际需求扩展去重、重试、代理等功能。

相比传统同步爬虫,异步生产者 - 消费者模型在处理大量 IO 密集型任务时,能以更低的资源开销实现更高的并发量,是 Python 爬虫开发中值得掌握的核心技术。

http://www.jsqmd.com/news/325511/

相关文章:

  • 基于ssm的考研复习平台w0ws1848(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,架构界面在最后面。
  • 如何用Python搭建一个网站
  • 2026年技巧:用抖音训练测试AI模型
  • 互联网大厂Java面试实录:核心技术栈与业务场景深度解析
  • 抽奖系统推荐!年会必备神器!老司机专用神器!
  • Java中随机数生成_java 随机数,零基础入门到精通,收藏这篇就够了
  • 开源工具如何让测试报告美感飙升200%:专业解析与实战指南
  • 深入解析:iOS开发:关于日志框架
  • 2026年当下头部门窗采购有哪些,平移断桥提升窗/侧压平移推拉窗/推拉窗/门窗/安全门窗/窗纱一体铝门窗,门窗采购排行
  • 软件测试公众号爆款内容解析:专业洞察与AI应用
  • Docker核心问题汇总(含原理、操作、网络全解析)
  • adobe acrobat软件可以-另存为-缩小大小-可以将23MB文件转成1MB,太牛逼了
  • [嵌入式系统-171]:直流电机通过PWM信号控制转速和连续运转;步进电机通过脉冲信号实现开环控制,每接收一个脉冲转动一个固定步距角;伺服电机则通过PWM脉宽指令结合内部反馈实现闭环控制,精确控制
  • 2026效率革命:AI会议纪要转测试需求的实战指南
  • Java SE 面向对象
  • 推荐专业的研究院转让公司,全国范围内哪家比较靠谱?
  • Python开发平台怎么选?核心功能与场景匹配指南
  • 低代码测试平台的隐私合规陷阱:软件测试从业者的专业避坑指南
  • 2026年行业内做得好的四边封包装袋批发厂家口碑推荐,中封袋/自立袋/纹路袋/聚酯尼龙袋,四边封包装袋订做厂家哪家靠谱
  • 2026年企业服务性价比大比拼,聊聊中企优帮的市场竞争力和费用详情
  • cycler复数形式是什么?cyclers用法解析
  • 量子算法测试结果解析工具的核心原理与应用价值
  • 408真题解析-2010-21-计组操作系统-中断执行顺序
  • 2026年上门按摩平台权威解析与推荐盘点:五大平台综合深度评估
  • 探讨推荐的AI办公鼠标企业哪家口碑好
  • 字为基·星为途——汉语何以领跑科技时代,领航星际文明?
  • 2026年螺杆式冷水机生产厂售后好的品牌口碑排行榜
  • 收藏备用|小白程序员必看!RAG技术详解,轻松搞定大模型“胡说八道”难题
  • ‌2026年测试数据生成需求暴增:成因、热度与从业者突围指南
  • 2026年马鞍山3M授权加盟连锁店口碑排名