当前位置: 首页 > news >正文

Python 爬虫分布式实战:Redis + 多进程爬虫实现分布式数据采集与任务分片

前言

单机爬虫受 CPU、网络带宽、IP 资源限制,在海量站点全量抓取场景中存在采集速度慢、任务无法拆分、故障难以续爬等短板,分布式爬虫通过任务拆解、多机器协同调度突破单机性能瓶颈。Redis 凭借高性能内存读写、数据结构丰富的特性成为分布式爬虫主流中间件,依托 List、Set、SortedSet 数据结构实现任务队列分发、去重过滤、采集状态标记,结合 Python 多进程、多线程实现单机资源最大化利用,搭配任务分片策略将海量目标 URL 拆分至多台爬虫节点并行消费。本文从分布式爬虫架构设计、Redis 各类数据结构落地场景、任务生产消费模型、URL 布隆去重、多进程任务隔离、宕机续爬机制、分布式锁防重复抓取全维度落地实战,配套完整可运行工程化代码与底层原理剖析,覆盖中小型分布式爬虫从开发到部署全流程。

本文所需依赖官方文档超链接:

  1. Redis-Py 官方开发文档
  2. Requests 官方文档
  3. BeautifulSoup4 官方文档
  4. SQLAlchemy 官方文档

一、分布式爬虫整体架构与 Redis 核心选型原理

1.1 三层分布式爬虫架构组成

分布式爬虫划分为任务生产端、分布式任务中间件、爬虫消费节点三层结构,三层解耦可独立部署扩容:

  1. 任务生产端:负责解析站点首页、分类页,批量生成待采集商品 / 资讯 URL,统一推送至 Redis 任务队列,仅做任务产出不参与数据抓取,可独立部署在轻量服务器;
  2. Redis 中间件层:作为全集群共享存储中心,拆分四种数据存储区域:待抓取任务队列、已抓取 URL 去重集合、失败重试任务池、分布式锁存储键,所有爬虫节点共享同一份 Redis 数据,实现跨机器任务互通;
  3. 爬虫消费节点:多台物理 / 云服务器作为消费端,从 Redis 拉取待采集 URL 执行页面请求、数据解析、入库操作,单节点内部通过多进程拆分子任务,最大化利用单机硬件资源。

1.2 Redis 数据结构在爬虫中的分工对照表

表格

Redis 数据结构存储内容爬虫业务用途
List (链表)待采集原始 URL左侧入队生产任务,右侧阻塞弹出消费,实现 FIFO 先进先出任务队列
Set (无序集合)已完成抓取 URL全集群 URL 去重,入队前校验集合,规避重复入队造成重复采集
SortedSet (有序集合)抓取失败 URL依据失败次数设置分数,分数递增实现延迟重试,避免故障 URL 频繁占用资源
String (字符串)分布式锁标识单 URL 全局锁,防止多节点同一时刻抓取同一个链接造成重复入库

1.3 分布式爬虫核心运行逻辑

生产者依据站点分页规则批量构造目标 URL,校验 URL 未存入去重 Set 后写入 List 任务队列;所有爬虫消费节点启动后持续阻塞从 List 尾部取出任务,获取 URL 后抢占分布式锁,加锁成功执行页面采集,抓取成功则将 URL 写入已抓取 Set,抓取失败则存入 SortedSet 并累加失败分数;定时巡检 SortedSet,将达到重试阈值的失败链接重新放回主任务队列实现二次抓取,节点宕机未完成的任务仍留存 Redis,新节点启动后可继续消费剩余任务,天然支持断点续爬。

二、环境依赖安装与 Redis 基础连接配置

2.1 依赖批量安装指令

bash

运行

# Redis连接驱动 pip install redis==5.0.8 # 爬虫基础采集库 pip install requests==2.31.0 beautifulsoup4==4.12.3 lxml==5.3.0 # 数据入库ORM库 pip install sqlalchemy==2.0.35 pymysql==1.1.0 # 多进程进程池依赖(Python内置无需额外安装)

2.2 Redis 客户端全局连接初始化

python

运行

import redis # Redis连接配置,分布式集群所有节点共用同一Redis实例 REDIS_CONFIG = { "host": "127.0.0.1", "port": 6379, "db": 0, "password": "", "decode_responses": True # 自动解码bytes为字符串,省去编码转换 } # 初始化全局Redis连接对象 redis_client = redis.Redis(**REDIS_CONFIG) # Redis爬虫业务Key常量定义,统一管理避免硬编码 TASK_QUEUE_KEY = "dist:crawl:task_queue" DONE_URL_SET_KEY = "dist:crawl:done_url_set" FAIL_URL_ZSET_KEY = "dist:crawl:fail_url_zset" DISTRIBUTE_LOCK_PREFIX = "dist:crawl:lock:"
代码原理

decode_responses 参数开启后,Redis 返回数据自动转为 str 类型,适配 URL 字符串存储场景;业务 Key 统一常量定义便于后期批量修改 Redis 存储标识,区分不同爬虫项目缓存键避免数据混淆。

三、数据库 ORM 模型复用与入库逻辑

沿用前文商品数据表结构,实现爬虫数据统一落地 MySQL,全集群所有节点共用同一数据库实例:

python

运行

from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime from sqlalchemy.orm import declarative_base, sessionmaker from datetime import datetime DB_CONF = {"user": "root", "password": "123456", "host": "127.0.0.1", "port": 3306, "db_name": "crawl_data", "charset": "utf8mb4"} DB_URL = f"mysql+pymysql://{DB_CONF['user']}:{DB_CONF['password']}@{DB_CONF['host']}:{DB_CONF['port']}/{DB_CONF['db_name']}?charset={DB_CONF['charset']}" engine = create_engine(DB_URL, pool_size=8, max_overflow=16, pool_recycle=3600, echo=False) Base = declarative_base() SessionDb = sessionmaker(bind=engine, autoflush=False, autocommit=False) class GoodsInfo(Base): __tablename__ = "dist_crawl_goods" id = Column(Integer, primary_key=True, autoincrement=True) goods_url = Column(String(512), index=True, comment="商品源链接") goods_name = Column(String(255), comment="商品名称") price = Column(Float, comment="售价") source_node = Column(String(64), comment="采集节点标识,记录哪台机器抓取") crawl_time = Column(DateTime, default=datetime.now) Base.metadata.create_all(bind=engine) def batch_save_db(data_list: list): """分布式节点统一批量入库函数""" sess = SessionDb() obj_list = [GoodsInfo(**item) for item in data_list] try: sess.add_all(obj_list) sess.commit() return len(obj_list) except Exception as e: sess.rollback() print(f"入库异常:{str(e)}") return 0 finally: sess.close()

四、模块一:分布式任务生产者实现(URL 生产 + 入队 + 前置去重)

生产者独立脚本运行,批量生成分页 URL,完成去重校验后写入 Redis 任务队列,可单独部署在一台服务器持续产出任务:

python

运行

from urllib.parse import urljoin import requests HEADERS = {"User-Agent": "Mozilla/5.0 Chrome/123.0.0.0 Safari/537.36"} BASE_DOMAIN = "https://demo-dist.com" def produce_crawl_task(start_page: int, end_page: int): """批量生产分页任务URL,入队Redis任务队列""" print(f"开始生产{start_page}~{end_page}页任务") for page in range(start_page, end_page + 1): page_url = urljoin(BASE_DOMAIN, f"/goods/list?page={page}") # 前置去重:已抓取URL不再重复入队 if redis_client.sismember(DONE_URL_SET_KEY, page_url): continue # 左侧LPUSH写入任务队列 redis_client.lpush(TASK_QUEUE_KEY, page_url) print(f"任务入队:{page_url}") print("本轮任务生产完毕") # 生产者启动入口 if __name__ == "__main__": produce_crawl_task(1, 200)
底层原理

sismember校验 URL 是否存在已抓取集合,存在直接跳过实现生产者层面去重;lpush从 List 链表左侧写入数据,消费端rpop从右侧取出,天然 FIFO 队列;生产者与消费者完全隔离,可随时新增生产者脚本加速任务产出。

五、模块二:分布式消费者实现(阻塞拉取 + 分布式锁 + 失败重试)

消费端分为两层:主进程循环阻塞从 Redis 拉取任务,内部开启多进程池并发抓取页面,搭配 Redis 分布式锁防止多节点抢占同一 URL 重复采集,抓取失败自动写入失败任务有序集合。

python

运行

from multiprocessing import Pool import random import time # 当前节点标识,入库时标记来源机器 NODE_TAG = "crawl_node_01" PROCESS_NUM = 4 # 单节点开启4个采集子进程 def get_dist_lock(url: str, expire: int = 30): """Redis分布式锁:SET key value EX expire NX,加锁成功返回True""" lock_key = DISTRIBUTE_LOCK_PREFIX + url # NX不存在才创建,EX设置锁过期时间防止死锁 lock_res = redis_client.set(lock_key, NODE_TAG, ex=expire, nx=True) return lock_res is not None def release_dist_lock(url: str): """释放分布式锁""" lock_key = DISTRIBUTE_LOCK_PREFIX + url redis_client.delete(lock_key) def single_goods_crawl(target_url: str): """单个URL采集逻辑:页面请求、数据解析、入库""" save_data = [] try: resp = requests.get(target_url, headers=HEADERS, timeout=10) resp.raise_for_status() # 模拟页面解析逻辑 for idx in range(random.randint(15,30)): item = { "goods_url": f"{target_url}#{idx}", "goods_name": f"分布式采集商品{random.randint(1000,9999)}", "price": round(random.uniform(9.9,2999),2), "source_node": NODE_TAG } save_data.append(item) # 抓取成功,URL存入已完成集合 redis_client.sadd(DONE_URL_SET_KEY, target_url) # 批量入库 succ_count = batch_save_db(save_data) print(f"[{NODE_TAG}]成功抓取:{target_url},入库{succ_count}条") return True except Exception as e: print(f"[{NODE_TAG}]抓取失败{target_url}:{str(e)}") # 抓取失败写入ZSET,默认失败分数1,后续重试分数+1 old_score = redis_client.zscore(FAIL_URL_ZSET_KEY, target_url) or 0 new_score = old_score + 1 redis_client.zadd(FAIL_URL_ZSET_KEY, {target_url: new_score}) return False finally: release_dist_lock(target_url) def pool_crawl_worker(url_list: list): """进程池批量消费一批URL""" with Pool(processes=PROCESS_NUM) as pool: pool.map(single_goods_crawl, url_list) def consumer_run(): """消费端主循环:阻塞拉取任务""" print(f"分布式爬虫节点{NODE_TAG}启动,进程数:{PROCESS_NUM}") while True: # brpop阻塞弹出,无任务时阻塞等待10秒释放资源,阻塞式消费 pop_res = redis_client.brpop(TASK_QUEUE_KEY, timeout=10) if not pop_res: continue _, task_url = pop_res # 抢占分布式锁,加锁失败说明其他节点正在抓取,丢弃本次任务 if not get_dist_lock(task_url): continue # 单条URL入进程池抓取 pool_crawl_worker([task_url]) # 消费节点启动 if __name__ == "__main__": consumer_run()
代码原理拆解
  1. 分布式锁原理set nx ex原子指令实现加锁,NX 保证仅首个请求节点创建锁,EX 设置过期时间规避爬虫异常崩溃导致死锁;
  2. brpop 阻塞消费:Redis 阻塞弹出指令,队列无任务时进程休眠,相比轮询无限空查节省服务器 CPU;
  3. SortedSet 失败任务:失败链接分数随报错次数递增,分数越高代表故障次数越多,便于定时过滤高失败链接人工排查。

六、模块三:失败任务定时重试调度器

独立定时脚本,定时从 SortedSet 筛选失败分数小于 5 的 URL(失败≤4 次),重新放回主任务队列,超过 5 次判定为失效链接永久沉淀不再重试:

python

运行

from apscheduler.schedulers.background import BackgroundScheduler def retry_fail_task(): """定时取回失败任务,重新入队主队列""" # zrange筛选分数0~4的失败链接 fail_url_list = redis_client.zrangebyscore(FAIL_URL_ZSET_KEY, min=0, max=4) if not fail_url_list: print("暂无需要重试的失败任务") return for url in fail_url_list: # 再次校验是否已被成功抓取 if redis_client.sismember(DONE_URL_SET_KEY, url): redis_client.zrem(FAIL_URL_ZSET_KEY, url) continue redis_client.lpush(TASK_QUEUE_KEY, url) print(f"本轮重试入队{len(fail_url_list)}个失败URL") def start_retry_scheduler(): scheduler = BackgroundScheduler() # 每30分钟执行一次失败任务重试 scheduler.add_job(retry_fail_task, "interval", minutes=30, id="fail_retry_job", replace_existing=True) scheduler.start() print("失败任务重试调度器启动成功") try: while True: time.sleep(3600) except KeyboardInterrupt: scheduler.shutdown() if __name__ == "__main__": start_retry_scheduler()

七、分布式扩容与任务分片落地方案

7.1 多节点横向扩容部署

  1. 多台服务器安装 Python、Redis 依赖,统一指向同一远端 Redis 服务(修改 REDIS_CONFIG 的 host 为公网 RedisIP);
  2. 每台机器单独启动消费端脚本,修改 NODE_TAG 区分不同节点标识;
  3. 生产者可部署多实例,调高分页生产范围,Redis 自动均衡分发任务至空闲节点。

7.2 海量任务分片生产优化

超十万 URL 场景下拆分生产者分片,多生产者分工生产不同页码区间:

  • 节点 A 生产者:produce_crawl_task (1,5000)
  • 节点 B 生产者:produce_crawl_task (5001,10000) 实现任务分片生产,避免单生产者性能瓶颈。

八、分布式爬虫故障排查与优化配置表

表格

故障现象诱因优化方案
多节点重复抓取同一个 URL未加分布式锁,网络延迟并发入队启用 Redis SET NX 分布式锁,设置合理锁过期时长
失败任务无限堆积 ZSET失效链接持续重试累加分数设置最大重试阈值,超阈值 ZREM 移除链接落地异常表
Redis 内存占用持续飙升大量已抓取 URL 长期留存 Set定时清理历史已抓取 URL,按月归档至 MySQL
消费节点空闲但队列堆积任务单进程消费效率不足调高单节点 PROCESS_NUM 进程数或新增消费节点

九、全链路综合启动部署规范

  1. 启动顺序:启动 Redis 服务→启动失败重试调度器→启动多台消费节点脚本→启动生产者脚本;
  2. 部署方式:Linux 环境使用 nohup 分别后台常驻生产者、消费者、重试调度三类脚本;
  3. 数据监控:通过 redis-cli 查看 LLEN 任务队列长度、SCARD 已抓取集合数量实时监控采集进度。
http://www.jsqmd.com/news/946942/

相关文章:

  • 蓝桥杯5G仿真平台保姆级配置指南:从BBU到核心网,手把手带你打通第一个5G呼叫
  • 2026年实测AI写作辅助平台榜单(实测甄选版)
  • 从‘nvidia-smi’到跑通第一个CUDA核函数:给Python开发者的CentOS服务器GPU编程初体验
  • Halcon region转图像踩坑实录:region_to_bin、region_to_label、region_to_mean到底怎么选?
  • 京东自动下单工具终极指南:4步实现24小时智能购物监控
  • 自制Digispark开发板:从ATtiny85芯片到USB可编程硬件的完整实践
  • STK卫星仿真出的数据怎么用?手把手教你将STK轨道导出为TLE格式(MATLAB联动篇)
  • 从零开始组装电脑:硬件选型、兼容性检查与装机全流程实战指南
  • 别再只盯着GPS了!手把手教你用Arduino解析北斗/GPS模块的NMEA 0183数据(附完整代码)
  • 3步搞定Mac鼠标指针个性化:Mousecape完整使用指南
  • RK3568双网口开发板,u-boot下如何固定网络设备?一个env变量ethact就搞定
  • 告别Redis?用C++手把手教你玩转LMDB:一个嵌入式内存映射数据库的实战入门
  • Qwen3.6-Plus实战:8分钟生成可部署官网的前端工作流
  • SpringBoot项目OOM排查实录:一个10MB的max-http-header-size配置是如何吃光8G堆内存的
  • 创客教育中电路设计的多元应用:从模块化到生活场景实践
  • 深入对比:ZYNQ7000上EMMC与SD卡的裸机驱动性能实测与选型建议
  • Nano Banana Pro深度实战:ARM64嵌入式Linux工作站硬核指南
  • 消费返利模式的底层困局:为什么很多平台从一开始就走不远?
  • 避坑指南:STM32F103标准库DAC配置常见误区(以PA4输出为例,含波形生成与缓存设置)
  • 哪家成都全屋定制品牌专业?2026年6月推荐TOP5儿童房环保安全评测特点市场份额 - 品牌推荐
  • KAN实战:用5行代码解决偏微分方程,参数效率比传统PINNs高100倍
  • 告别玄学:给你的STM32 Bootloader跳转函数加个‘安全检查清单’(含代码详解)
  • DeepSeek系列大模型本地部署与行业应用实践指南
  • C++多线程安全传参避坑指南:detach()模式下如何正确传递指针和对象?
  • 告别Windows 7!手把手教你用DevEco Studio 2.0.12.201搭建鸿蒙开发环境(附华为账号注册避坑)
  • STM32F103驱动RC522读写MIFARE卡并修改扇区密钥的可运行工程
  • 智能客服响应延迟骤降92%,企业AI工具整合避坑清单,仅剩最后87份内部文档模板
  • C++编写的BMP条形码定位与数字解码工具集(含预处理、频域增强与形态学操作)
  • 从汽车悬架到手机陀螺仪:阻尼振动微分方程在工程中的实际应用盘点
  • MATLAB工程仿真用代理模型全流程工具箱(含DOE设计、Kriging建模与EGO优化)