当前位置: 首页 > news >正文

异步爬虫 aiohttp 进阶实战——高并发采集的正确姿势

前面的爬虫都是同步的——发一个请求等着回来,再发下一个。遇到大规模采集时,同步 IO 等在网络上的时间占了 90%,CPU 一直在空闲。

aiohttp+asyncio做异步爬虫,同时发出几十个请求,总时间从几小时压缩到十几分钟。

一、同步 vs 异步 的核心区别

# 同步:一个一个来,总共 10 秒# 请求1 → 等1秒 → 请求2 → 等1秒 → ... → 请求10 → 等1秒# 总时间 = 10秒# 异步:同时发出,总共 1 秒# 请求1 → 等1秒 → 返回# 请求2 → 等1秒 → 返回# ... (1秒后全部返回)# 请求10 → 等1秒 → 返回# 总时间 ≈ 1秒

异步适合 IO 密集型任务(网络请求、文件读写),不适合 CPU 密集型(图片处理、数据计算)。

二、aiohttp 基础

1. 安装

pipinstallaiohttp

2. 最简单的异步请求

importaiohttpimportasyncioasyncdeffetch(url):"""异步请求一个 URL"""asyncwithaiohttp.ClientSession()assession:asyncwithsession.get(url)asresp:# 返回文本内容returnawaitresp.text()# 执行html=asyncio.run(fetch("https://example.com"))print(html[:200])

3. 并发请求多个 URL

importaiohttpimportasyncioimporttimeasyncdeffetch_one(session,url):"""单个请求"""try:asyncwithsession.get(url,timeout=10)asresp:returnawaitresp.text()exceptExceptionase:returnf"请求失败:{e}"asyncdeffetch_all(urls):"""并发请求所有 URL"""asyncwithaiohttp.ClientSession()assession:tasks=[fetch_one(session,url)forurlinurls]results=awaitasyncio.gather(*tasks)returnresults# 使用urls=[f"https://example.com/page/{i}"foriinrange(1,21)]start=time.time()results=asyncio.run(fetch_all(urls))print(f"总耗时:{time.time()-start:.2f}秒")print(f"共获取{len(results)}个页面")

同步写法跑 20 个页面要 20 秒以上,异步跑大约 1-2 秒(取决于网络)。

三、控制并发数——信号量

如果不控制并发,一下子发出几百个请求,很可能被网站封 IP 或自己电脑连接数不够。

importaiohttpimportasyncioclassAsyncCrawler:"""带并发控制的异步爬虫"""def__init__(self,max_concurrency=10):# 信号量:控制最大并发数self.semaphore=asyncio.Semaphore(max_concurrency)self.results=[]asyncdeffetch(self,session,url):"""带并发限制的请求"""asyncwithself.semaphore:# 超过 max_concurrency 会等待try:asyncwithsession.get(url,timeout=10)asresp:text=awaitresp.text()print(f"完成:{url}({len(text)}字符)")return(url,text)exceptExceptionase:print(f"失败:{url}-{e}")return(url,None)asyncdefcrawl(self,urls):"""批量爬取"""asyncwithaiohttp.ClientSession()assession:tasks=[self.fetch(session,url)forurlinurls]self.results=awaitasyncio.gather(*tasks)returnself.resultsdefsave_results(self,filename="results.json"):"""保存结果"""importjson data=[]forurl,contentinself.results:ifcontent:data.append({"url":url,"length":len(content)})withopen(filename,"w",encoding="utf-8")asf:json.dump(data,f,ensure_ascii=False,indent=2)print(f"已保存{len(data)}条结果到{filename}")# 使用:同时最多 10 个请求crawler=AsyncCrawler(max_concurrency=10)urls=[f"https://example.com/page/{i}"foriinrange(1,101)]importtime start=time.time()results=asyncio.run(crawler.crawl(urls))print(f"总耗时:{time.time()-start:.2f}s")crawler.save_results()

四、异步爬取 + 解析

可以用asyncio.Queue做生产者-消费者模式:

importaiohttpimportasynciofrombs4importBeautifulSoupasyncdefworker(name,queue,session,results):"""消费者:从队列取 URL 并爬取"""whileTrue:url=awaitqueue.get()try:asyncwithsession.get(url)asresp:html=awaitresp.text()# 解析soup=BeautifulSoup(html,"html.parser")title=soup.title.stringifsoup.titleelse"无标题"results.append({"url":url,"title":title})print(f"[{name}] 完成:{url}{title}")exceptExceptionase:print(f"[{name}] 失败:{url}-{e}")finally:queue.task_done()asyncdefmain(urls,concurrency=10):"""主入口:生产者+消费者模式"""queue=asyncio.Queue()results=[]# 生产者:往队列放 URLforurlinurls:awaitqueue.put(url)asyncwithaiohttp.ClientSession()assession:# 创建 N 个消费者协程workers=[asyncio.create_task(worker(f"worker-{i}",queue,session,results))foriinrange(concurrency)]# 等待队列处理完毕awaitqueue.join()# 取消所有 workerforwinworkers:w.cancel()returnresults# 使用urls=[f"https://example.com/page/{i}"foriinrange(1,51)]results=asyncio.run(main(urls,concurrency=10))print(f"\n共爬取{len(results)}个页面")forrinresults[:5]:print(f"{r['url']}{r['title']}")

五、超时与重试

1. 设置超时

asyncdeffetch_with_timeout(session,url):"""带超时的请求"""try:# 总超时30秒,连接超时10秒timeout=aiohttp.ClientTimeout(total=30,connect=10)asyncwithsession.get(url,timeout=timeout)asresp:returnawaitresp.text()exceptasyncio.TimeoutError:print(f"超时:{url}")returnNone

2. 自动重试

asyncdeffetch_with_retry(session,url,max_retries=3):"""带重试的请求"""forattemptinrange(max_retries):try:asyncwithsession.get(url,timeout=10)asresp:ifresp.status==200:returnawaitresp.text()else:print(f"状态码异常{resp.status}:{url}")exceptExceptionase:print(f"第{attempt+1}次失败:{url}-{e}")awaitasyncio.sleep(2**attempt)# 指数退避:1s、2s、4sreturnNone

六、异步 + 代理

asyncdeffetch_with_proxy(session,url,proxy):"""使用代理"""try:asyncwithsession.get(url,proxy=proxy,timeout=10)asresp:returnawaitresp.text()exceptExceptionase:print(f"代理{proxy}请求失败:{e}")returnNoneasyncdefcrawl_with_proxies(urls,proxies):"""使用代理池并发爬取"""asyncwithaiohttp.ClientSession()assession:tasks=[]fori,urlinenumerate(urls):proxy=proxies[i%len(proxies)]tasks.append(fetch_with_proxy(session,url,proxy))returnawaitasyncio.gather(*tasks)

七、异步爬虫的最佳实践

并发数设置

10 个并发 → 阿里云等大网站基本没压力 20 个并发 → 多数小网站也扛得住 50 个并发 → 可能触发反爬 100+ 个并发 → 被 ban 概率极高,且本地连接数可能不够用

建议从 5-10 个并发开始,慢慢往上加。

完整模板

importaiohttpimportasyncioimporttimefromtypingimportList,DictclassBaseAsyncCrawler:"""异步爬虫基类"""def__init__(self,max_concurrency=10,delay=0):self.max_concurrency=max_concurrency self.delay=delay# 请求间隔(秒)self.semaphore=asyncio.Semaphore(max_concurrency)self.session=Noneasyncdef__aenter__(self):self.session=aiohttp.ClientSession()returnselfasyncdef__aexit__(self,*args):awaitself.session.close()asyncdeffetch(self,url:str)->str:"""单个请求"""asyncwithself.semaphore:try:asyncwithself.session.get(url,timeout=10)asresp:ifself.delay:awaitasyncio.sleep(self.delay)returnawaitresp.text()exceptExceptionase:print(f"请求失败{url}:{e}")return""asyncdefcrawl(self,urls:List[str])->List[str]:"""批量爬取"""tasks=[self.fetch(url)forurlinurls]returnawaitasyncio.gather(*tasks)# 使用asyncdefmain():urls=[f"https://example.com/page/{i}"foriinrange(10)]asyncwithBaseAsyncCrawler(max_concurrency=5)ascrawler:start=time.time()results=awaitcrawler.crawl(urls)print(f"完成{len(results)}个请求,耗时{time.time()-start:.2f}s")asyncio.run(main())

八、异步 vs 多线程 怎么选

对比异步(aiohttp)多线程(requests+ThreadPool)
性能✅ 极高,几千并发没问题❌ 受限于 GIL 和线程切换
代码⭐⭐ 需要 async/await 语法⭐ 简单,不用学新语法
调试⭐⭐ 稍麻烦⭐ 容易
适用大规模采集(上万条)中小规模(几千条)

建议:

  • 爬几千条数据,用requests + ThreadPoolExecutor就够了
  • 爬几万条以上,上aiohttp异步
  • 不要为了异步而异步,简单够用优先

💡 觉得有用的话,点赞 + 关注【张老师技术栈】吧!每周更新 Java/Python/爬虫 实战干货,不让你白来。

http://www.jsqmd.com/news/1090691/

相关文章:

  • 鸿蒙 ArkTS 实战:Lab Record Book 从状态建模到交互闭环完整解析
  • Python 知识体系深度解析与学习指南
  • 【操作系统】经典同步问题:生产者-消费者
  • 李宏毅深度学习课程集成学习学习报告
  • AI模型能力演进与安全发布机制解析
  • 3分钟掌握HS2-HF Patch:一站式汉化去码解决方案终极指南
  • 93亿反杀800亿!Ideogram 4登顶开源之王,设计师要失业了?
  • 2026年想找靠谱的金相显微镜工厂 这些实用选购干货值得你参考
  • Android binder(RPC) 通信概念与架构
  • Gemini原生多模态:统一表示空间与跨模态因果推理
  • TVA在具身智能产业化体系的落地案例详解(4)
  • 文件上传漏洞防御实战:从原理到PHP安全实现
  • 15分钟构建专业级黑苹果配置:OpCore-Simplify的智能化解决方案
  • SN65DSI8X视频桥接芯片硬件设计:从电源管理到高速信号完整性实战
  • 为什么你的ChatGPT API账单比同行高3.2倍?——GPT-4 Turbo vs GPT-3.5 Turbo的11项成本对比实验报告
  • Dalín X 意识框架实测数据报告
  • 技术桥接中的抽象分离与实现独立
  • 终极内存检测指南:5步彻底解决电脑蓝屏和死机问题
  • 鸿蒙 ArkTS 实战:Essay Material Library 从状态建模到交互闭环完整解析
  • 【声呐仿真】实战指南:从零部署DAVE与UUV Simulator完整环境
  • AI论文写作软件推荐
  • WorkshopDL:高效便捷的跨平台Steam创意工坊下载解决方案
  • 星皓 MDM.Plus 是什么?面向手机租赁和企业设备管理的一站式 MDM 解决方案
  • 3大核心技术揭秘:Memtest86+如何成为内存故障诊断的金标准
  • 从《视若无睹》到技术洞察:当观察力成为产品经理的核心武器
  • 这5个被99%开发者忽略的DeepSeek优势,正让ChatGPT用户连夜重构架构(CUDA优化细节、MoE激活率、KV Cache压缩率独家披露)
  • A股量化,单策略真的不够用了:我开源了一个双策略自动切换框架
  • 如何三步获取阿里云盘Refresh Token?解锁云盘自动化管理新体验
  • DAC81408评估板实战指南:从硬件连接到软件配置与多通道信号生成
  • 代码处理doc文档