Python 爬虫分布式实战:Redis + 多进程爬虫实现分布式数据采集与任务分片
前言
单机爬虫受 CPU、网络带宽、IP 资源限制,在海量站点全量抓取场景中存在采集速度慢、任务无法拆分、故障难以续爬等短板,分布式爬虫通过任务拆解、多机器协同调度突破单机性能瓶颈。Redis 凭借高性能内存读写、数据结构丰富的特性成为分布式爬虫主流中间件,依托 List、Set、SortedSet 数据结构实现任务队列分发、去重过滤、采集状态标记,结合 Python 多进程、多线程实现单机资源最大化利用,搭配任务分片策略将海量目标 URL 拆分至多台爬虫节点并行消费。本文从分布式爬虫架构设计、Redis 各类数据结构落地场景、任务生产消费模型、URL 布隆去重、多进程任务隔离、宕机续爬机制、分布式锁防重复抓取全维度落地实战,配套完整可运行工程化代码与底层原理剖析,覆盖中小型分布式爬虫从开发到部署全流程。
本文所需依赖官方文档超链接:
- Redis-Py 官方开发文档
- Requests 官方文档
- BeautifulSoup4 官方文档
- SQLAlchemy 官方文档
一、分布式爬虫整体架构与 Redis 核心选型原理
1.1 三层分布式爬虫架构组成
分布式爬虫划分为任务生产端、分布式任务中间件、爬虫消费节点三层结构,三层解耦可独立部署扩容:
- 任务生产端:负责解析站点首页、分类页,批量生成待采集商品 / 资讯 URL,统一推送至 Redis 任务队列,仅做任务产出不参与数据抓取,可独立部署在轻量服务器;
- Redis 中间件层:作为全集群共享存储中心,拆分四种数据存储区域:待抓取任务队列、已抓取 URL 去重集合、失败重试任务池、分布式锁存储键,所有爬虫节点共享同一份 Redis 数据,实现跨机器任务互通;
- 爬虫消费节点:多台物理 / 云服务器作为消费端,从 Redis 拉取待采集 URL 执行页面请求、数据解析、入库操作,单节点内部通过多进程拆分子任务,最大化利用单机硬件资源。
1.2 Redis 数据结构在爬虫中的分工对照表
表格
| Redis 数据结构 | 存储内容 | 爬虫业务用途 |
|---|---|---|
| List (链表) | 待采集原始 URL | 左侧入队生产任务,右侧阻塞弹出消费,实现 FIFO 先进先出任务队列 |
| Set (无序集合) | 已完成抓取 URL | 全集群 URL 去重,入队前校验集合,规避重复入队造成重复采集 |
| SortedSet (有序集合) | 抓取失败 URL | 依据失败次数设置分数,分数递增实现延迟重试,避免故障 URL 频繁占用资源 |
| String (字符串) | 分布式锁标识 | 单 URL 全局锁,防止多节点同一时刻抓取同一个链接造成重复入库 |
1.3 分布式爬虫核心运行逻辑
生产者依据站点分页规则批量构造目标 URL,校验 URL 未存入去重 Set 后写入 List 任务队列;所有爬虫消费节点启动后持续阻塞从 List 尾部取出任务,获取 URL 后抢占分布式锁,加锁成功执行页面采集,抓取成功则将 URL 写入已抓取 Set,抓取失败则存入 SortedSet 并累加失败分数;定时巡检 SortedSet,将达到重试阈值的失败链接重新放回主任务队列实现二次抓取,节点宕机未完成的任务仍留存 Redis,新节点启动后可继续消费剩余任务,天然支持断点续爬。
二、环境依赖安装与 Redis 基础连接配置
2.1 依赖批量安装指令
bash
运行
# Redis连接驱动 pip install redis==5.0.8 # 爬虫基础采集库 pip install requests==2.31.0 beautifulsoup4==4.12.3 lxml==5.3.0 # 数据入库ORM库 pip install sqlalchemy==2.0.35 pymysql==1.1.0 # 多进程进程池依赖(Python内置无需额外安装)2.2 Redis 客户端全局连接初始化
python
运行
import redis # Redis连接配置,分布式集群所有节点共用同一Redis实例 REDIS_CONFIG = { "host": "127.0.0.1", "port": 6379, "db": 0, "password": "", "decode_responses": True # 自动解码bytes为字符串,省去编码转换 } # 初始化全局Redis连接对象 redis_client = redis.Redis(**REDIS_CONFIG) # Redis爬虫业务Key常量定义,统一管理避免硬编码 TASK_QUEUE_KEY = "dist:crawl:task_queue" DONE_URL_SET_KEY = "dist:crawl:done_url_set" FAIL_URL_ZSET_KEY = "dist:crawl:fail_url_zset" DISTRIBUTE_LOCK_PREFIX = "dist:crawl:lock:"代码原理
decode_responses 参数开启后,Redis 返回数据自动转为 str 类型,适配 URL 字符串存储场景;业务 Key 统一常量定义便于后期批量修改 Redis 存储标识,区分不同爬虫项目缓存键避免数据混淆。
三、数据库 ORM 模型复用与入库逻辑
沿用前文商品数据表结构,实现爬虫数据统一落地 MySQL,全集群所有节点共用同一数据库实例:
python
运行
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime from sqlalchemy.orm import declarative_base, sessionmaker from datetime import datetime DB_CONF = {"user": "root", "password": "123456", "host": "127.0.0.1", "port": 3306, "db_name": "crawl_data", "charset": "utf8mb4"} DB_URL = f"mysql+pymysql://{DB_CONF['user']}:{DB_CONF['password']}@{DB_CONF['host']}:{DB_CONF['port']}/{DB_CONF['db_name']}?charset={DB_CONF['charset']}" engine = create_engine(DB_URL, pool_size=8, max_overflow=16, pool_recycle=3600, echo=False) Base = declarative_base() SessionDb = sessionmaker(bind=engine, autoflush=False, autocommit=False) class GoodsInfo(Base): __tablename__ = "dist_crawl_goods" id = Column(Integer, primary_key=True, autoincrement=True) goods_url = Column(String(512), index=True, comment="商品源链接") goods_name = Column(String(255), comment="商品名称") price = Column(Float, comment="售价") source_node = Column(String(64), comment="采集节点标识,记录哪台机器抓取") crawl_time = Column(DateTime, default=datetime.now) Base.metadata.create_all(bind=engine) def batch_save_db(data_list: list): """分布式节点统一批量入库函数""" sess = SessionDb() obj_list = [GoodsInfo(**item) for item in data_list] try: sess.add_all(obj_list) sess.commit() return len(obj_list) except Exception as e: sess.rollback() print(f"入库异常:{str(e)}") return 0 finally: sess.close()四、模块一:分布式任务生产者实现(URL 生产 + 入队 + 前置去重)
生产者独立脚本运行,批量生成分页 URL,完成去重校验后写入 Redis 任务队列,可单独部署在一台服务器持续产出任务:
python
运行
from urllib.parse import urljoin import requests HEADERS = {"User-Agent": "Mozilla/5.0 Chrome/123.0.0.0 Safari/537.36"} BASE_DOMAIN = "https://demo-dist.com" def produce_crawl_task(start_page: int, end_page: int): """批量生产分页任务URL,入队Redis任务队列""" print(f"开始生产{start_page}~{end_page}页任务") for page in range(start_page, end_page + 1): page_url = urljoin(BASE_DOMAIN, f"/goods/list?page={page}") # 前置去重:已抓取URL不再重复入队 if redis_client.sismember(DONE_URL_SET_KEY, page_url): continue # 左侧LPUSH写入任务队列 redis_client.lpush(TASK_QUEUE_KEY, page_url) print(f"任务入队:{page_url}") print("本轮任务生产完毕") # 生产者启动入口 if __name__ == "__main__": produce_crawl_task(1, 200)底层原理
sismember校验 URL 是否存在已抓取集合,存在直接跳过实现生产者层面去重;lpush从 List 链表左侧写入数据,消费端rpop从右侧取出,天然 FIFO 队列;生产者与消费者完全隔离,可随时新增生产者脚本加速任务产出。
五、模块二:分布式消费者实现(阻塞拉取 + 分布式锁 + 失败重试)
消费端分为两层:主进程循环阻塞从 Redis 拉取任务,内部开启多进程池并发抓取页面,搭配 Redis 分布式锁防止多节点抢占同一 URL 重复采集,抓取失败自动写入失败任务有序集合。
python
运行
from multiprocessing import Pool import random import time # 当前节点标识,入库时标记来源机器 NODE_TAG = "crawl_node_01" PROCESS_NUM = 4 # 单节点开启4个采集子进程 def get_dist_lock(url: str, expire: int = 30): """Redis分布式锁:SET key value EX expire NX,加锁成功返回True""" lock_key = DISTRIBUTE_LOCK_PREFIX + url # NX不存在才创建,EX设置锁过期时间防止死锁 lock_res = redis_client.set(lock_key, NODE_TAG, ex=expire, nx=True) return lock_res is not None def release_dist_lock(url: str): """释放分布式锁""" lock_key = DISTRIBUTE_LOCK_PREFIX + url redis_client.delete(lock_key) def single_goods_crawl(target_url: str): """单个URL采集逻辑:页面请求、数据解析、入库""" save_data = [] try: resp = requests.get(target_url, headers=HEADERS, timeout=10) resp.raise_for_status() # 模拟页面解析逻辑 for idx in range(random.randint(15,30)): item = { "goods_url": f"{target_url}#{idx}", "goods_name": f"分布式采集商品{random.randint(1000,9999)}", "price": round(random.uniform(9.9,2999),2), "source_node": NODE_TAG } save_data.append(item) # 抓取成功,URL存入已完成集合 redis_client.sadd(DONE_URL_SET_KEY, target_url) # 批量入库 succ_count = batch_save_db(save_data) print(f"[{NODE_TAG}]成功抓取:{target_url},入库{succ_count}条") return True except Exception as e: print(f"[{NODE_TAG}]抓取失败{target_url}:{str(e)}") # 抓取失败写入ZSET,默认失败分数1,后续重试分数+1 old_score = redis_client.zscore(FAIL_URL_ZSET_KEY, target_url) or 0 new_score = old_score + 1 redis_client.zadd(FAIL_URL_ZSET_KEY, {target_url: new_score}) return False finally: release_dist_lock(target_url) def pool_crawl_worker(url_list: list): """进程池批量消费一批URL""" with Pool(processes=PROCESS_NUM) as pool: pool.map(single_goods_crawl, url_list) def consumer_run(): """消费端主循环:阻塞拉取任务""" print(f"分布式爬虫节点{NODE_TAG}启动,进程数:{PROCESS_NUM}") while True: # brpop阻塞弹出,无任务时阻塞等待10秒释放资源,阻塞式消费 pop_res = redis_client.brpop(TASK_QUEUE_KEY, timeout=10) if not pop_res: continue _, task_url = pop_res # 抢占分布式锁,加锁失败说明其他节点正在抓取,丢弃本次任务 if not get_dist_lock(task_url): continue # 单条URL入进程池抓取 pool_crawl_worker([task_url]) # 消费节点启动 if __name__ == "__main__": consumer_run()代码原理拆解
- 分布式锁原理:
set nx ex原子指令实现加锁,NX 保证仅首个请求节点创建锁,EX 设置过期时间规避爬虫异常崩溃导致死锁; - brpop 阻塞消费:Redis 阻塞弹出指令,队列无任务时进程休眠,相比轮询无限空查节省服务器 CPU;
- SortedSet 失败任务:失败链接分数随报错次数递增,分数越高代表故障次数越多,便于定时过滤高失败链接人工排查。
六、模块三:失败任务定时重试调度器
独立定时脚本,定时从 SortedSet 筛选失败分数小于 5 的 URL(失败≤4 次),重新放回主任务队列,超过 5 次判定为失效链接永久沉淀不再重试:
python
运行
from apscheduler.schedulers.background import BackgroundScheduler def retry_fail_task(): """定时取回失败任务,重新入队主队列""" # zrange筛选分数0~4的失败链接 fail_url_list = redis_client.zrangebyscore(FAIL_URL_ZSET_KEY, min=0, max=4) if not fail_url_list: print("暂无需要重试的失败任务") return for url in fail_url_list: # 再次校验是否已被成功抓取 if redis_client.sismember(DONE_URL_SET_KEY, url): redis_client.zrem(FAIL_URL_ZSET_KEY, url) continue redis_client.lpush(TASK_QUEUE_KEY, url) print(f"本轮重试入队{len(fail_url_list)}个失败URL") def start_retry_scheduler(): scheduler = BackgroundScheduler() # 每30分钟执行一次失败任务重试 scheduler.add_job(retry_fail_task, "interval", minutes=30, id="fail_retry_job", replace_existing=True) scheduler.start() print("失败任务重试调度器启动成功") try: while True: time.sleep(3600) except KeyboardInterrupt: scheduler.shutdown() if __name__ == "__main__": start_retry_scheduler()七、分布式扩容与任务分片落地方案
7.1 多节点横向扩容部署
- 多台服务器安装 Python、Redis 依赖,统一指向同一远端 Redis 服务(修改 REDIS_CONFIG 的 host 为公网 RedisIP);
- 每台机器单独启动消费端脚本,修改 NODE_TAG 区分不同节点标识;
- 生产者可部署多实例,调高分页生产范围,Redis 自动均衡分发任务至空闲节点。
7.2 海量任务分片生产优化
超十万 URL 场景下拆分生产者分片,多生产者分工生产不同页码区间:
- 节点 A 生产者:produce_crawl_task (1,5000)
- 节点 B 生产者:produce_crawl_task (5001,10000) 实现任务分片生产,避免单生产者性能瓶颈。
八、分布式爬虫故障排查与优化配置表
表格
| 故障现象 | 诱因 | 优化方案 |
|---|---|---|
| 多节点重复抓取同一个 URL | 未加分布式锁,网络延迟并发入队 | 启用 Redis SET NX 分布式锁,设置合理锁过期时长 |
| 失败任务无限堆积 ZSET | 失效链接持续重试累加分数 | 设置最大重试阈值,超阈值 ZREM 移除链接落地异常表 |
| Redis 内存占用持续飙升 | 大量已抓取 URL 长期留存 Set | 定时清理历史已抓取 URL,按月归档至 MySQL |
| 消费节点空闲但队列堆积任务 | 单进程消费效率不足 | 调高单节点 PROCESS_NUM 进程数或新增消费节点 |
九、全链路综合启动部署规范
- 启动顺序:启动 Redis 服务→启动失败重试调度器→启动多台消费节点脚本→启动生产者脚本;
- 部署方式:Linux 环境使用 nohup 分别后台常驻生产者、消费者、重试调度三类脚本;
- 数据监控:通过 redis-cli 查看 LLEN 任务队列长度、SCARD 已抓取集合数量实时监控采集进度。
