Python 爬虫实战:ThreadPoolExecutor 线程池、Redis 指纹持久化去重与定时爬虫数据分片归档实战
前言
伴随多线程并发抓取、文件与数据库多存储方案落地,现有爬虫工程出现两处典型短板:原生 Queue+threading 自研线程池代码冗余、任务异常管控繁琐;内存 set 集合存储 MD5 指纹仅在单次程序运行生效,进程重启后去重记录全部丢失,重复抓取问题复现;同时天气定时、文库批量爬虫长期持续采集会造成单数据表数据体量臃肿,全量查询、数据备份效率持续下滑。本篇聚焦工程化优化,使用 Python 内置concurrent.futures.ThreadPoolExecutor标准化线程池重构全部并发逻辑,依托 Redis 实现指纹持久化存储完成跨进程、跨重启永久去重,落地 MySQL 数据表按日期分片归档存储方案,同步改造前五期所有项目源码。 本篇新增依赖库官方文档链接汇总:
- redis-py:Python 操作 Redis 客户端库,实现集合、字符串等数据结构读写
- concurrent.futures 官方文档:内置线程 / 进程池标准库,无需额外安装
- python-dateutil:日期处理工具库,用于自动生成分片数据表名称
一、ThreadPoolExecutor 标准线程池原理与工具封装
1.1 线程池选型对比
表格
| 实现方案 | 开发成本 | 异常管控 | 适用场景 |
|---|---|---|---|
| 原生 threading+Queue | 高,需手动编写任务队列、任务完成标识 | 需自行捕获子线程异常 | 自定义深度定制特殊调度逻辑 |
| ThreadPoolExecutor | 低,内置任务调度、结果获取、超时管控 | 内置异常捕获回调 | 常规爬虫 IO 并发,主流工程选用 |
ThreadPoolExecutor 基于生产者消费者模型封装,内置最大并发数限制、任务提交、结果阻塞获取逻辑,提供 submit、map 两类任务提交方式,爬虫领域优先选用 submit 实现异构 URL 任务分发。
1.2 依赖安装
bash
运行
pip install redis python-dateutil1.3 通用线程池工具类
python
运行
from concurrent.futures import ThreadPoolExecutor, as_completed from spider_log import spider_log from SleepDelayUtil import SleepDelayUtil class ExecutorPoolUtil: def __init__(self, max_workers: int = 4): self.pool = ThreadPoolExecutor(max_workers=max_workers) def batch_submit_task(self, crawl_func, task_list, random_sleep_min=0.5, random_sleep_max=2): """ 批量提交爬虫任务 :param crawl_func: 单任务执行函数 :param task_list: 待执行任务参数列表 :param random_sleep_min: 请求最小延时 :param random_sleep_max: 请求最大延时 :return: 全部任务返回结果列表 """ future_dict = {} result_data = [] # 循环提交任务 for task_item in task_list: future = self.pool.submit(crawl_func, task_item) future_dict[future] = task_item SleepDelayUtil.random_sleep(random_sleep_min, random_sleep_max) # 遍历已完成任务 for future in as_completed(future_dict): task_info = future_dict[future] try: res = future.result() if res: result_data.append(res) spider_log.info(f"任务{task_info}抓取完成") except Exception as err: spider_log.error(f"任务{task_info}执行异常:{str(err)}") self.pool.shutdown(wait=True) return result_data1.4 代码原理详解
max_workers限定最大并发线程数量,从源头避免并发超限触发站点反爬;submit提交单个任务,返回 Future 对象,存储任务与 Future 映射关系便于异常溯源;as_completed阻塞遍历完成的任务,优先处理执行完毕的抓取任务,无需等待全部任务结束;shutdown(wait=True)等待所有子线程执行完毕后释放线程资源,避免线程泄漏。
1.5 技术社区爬虫并发改造示例
python
运行
def single_page_crawl(page_num): base_url = "https://example.com/tech/posts" html = self.get_list_page(base_url, page_num) page_data = self.parse_post_list(html) return page_data if __name__ == "__main__": page_task = [i for i in range(1,6)] pool = ExecutorPoolUtil(max_workers=2) all_data = pool.batch_submit_task(single_page_crawl, page_task, random_sleep_min=1.2, random_sleep_max=3.5)二、Redis 环境部署与持久化 MD5 去重工具开发
2.1 Redis 集合去重底层逻辑
Redis 的 Set 集合具备元素唯一性,重复添加相同 MD5 字符串时集合自动去重,数据落地 Redis 内存(可开启 RDB/AOF 持久化落盘),程序重启、多进程、多服务器分布式场景下指纹数据永久留存,是工业爬虫通用去重方案。
2.2 Redis 连接配置与工具类
python
运行
import redis from DataHashUtil import DataHashUtil from spider_log import spider_log class RedisDedupUtil: def __init__(self, redis_host="127.0.0.1", redis_port=6379, redis_db=0, dedup_key="spider:url:filter"): try: self.rd = redis.Redis(host=redis_host, port=redis_port, db=redis_db, decode_responses=True) self.dedup_set_key = dedup_key self.rd.ping() spider_log.info("Redis去重客户端连接成功") except Exception as e: spider_log.error(f"Redis连接失败:{str(e)}") self.rd = None def is_url_exist(self, target_url: str) -> bool: """校验URL是否已采集,True代表重复无需抓取""" if not self.rd: return False url_md5 = DataHashUtil.get_md5(target_url) exist_flag = self.rd.sismember(self.dedup_set_key, url_md5) return exist_flag == 1 def add_record(self, target_url: str): """抓取成功后将指纹存入Redis集合""" if not self.rd: return url_md5 = DataHashUtil.get_md5(target_url) self.rd.sadd(self.dedup_set_key, url_md5)2.3 文库爬虫接入 Redis 去重逻辑
在请求 HTML 之前增加重复校验:
python
运行
def get_page_html(self, url): # 校验是否已采集 if redis_dedup.is_url_exist(url): spider_log.info(f"链接{url}已抓取,跳过采集") return None try: resp = send_http_request(url, headers=self.headers, proxies=proxy_tool.get_random_proxy()) html = resp.text # 抓取成功写入指纹 redis_dedup.add_record(url) return html except Exception as e: spider_log.error(f"请求异常:{str(e)}") return None2.4 拓展:基于标题 + 正文混合指纹去重
针对同源不同 URL、内容重复的数据,拼接标题与摘要生成 MD5,更换 Redis 存储 Key 实现内容去重,适用于资讯文库类重复文摘过滤。
三、MySQL 数据表按日期分片归档方案落地
3.1 分片存储适用场景
天气定时爬虫每日循环入库、文库长期增量采集会造成单表百万级数据,分片规则:按自然年月日自动生成数据表,例如weather_data_20250520,每日新数据写入当日分片表,历史数据归档留存,查询、备份、清理数据按日期定向操作,优化数据库读写性能。
3.2 日期分片工具类
python
运行
from dateutil import tz from datetime import datetime from pymysql import connect from DB_CONFIG import DB_CONFIG class TableSplitUtil: @staticmethod def get_split_table_name(base_table: str) -> str: """生成当日分片表名""" now = datetime.now(tz=tz.gettz("Asia/Shanghai")) date_str = now.strftime("%Y%m%d") return f"{base_table}_{date_str}" @staticmethod def create_table_if_not_exist(table_name: str, create_sql: str): """不存在则自动创建分片数据表""" try: conn = connect(**DB_CONFIG) cur = conn.cursor() cur.execute(create_sql.format(table=table_name)) conn.commit() cur.close() conn.close() except Exception as e: spider_log.error(f"分片表创建异常:{str(e)}")3.3 天气数据表分片建表语句模板
sql
CREATE TABLE IF NOT EXISTS {table} ( id INT PRIMARY KEY AUTO_INCREMENT COMMENT '主键ID', city_name VARCHAR(50) NOT NULL COMMENT '城市名称', temperature VARCHAR(20) COMMENT '当前温度', weather VARCHAR(50) COMMENT '天气状况', wind_direction VARCHAR(50) COMMENT '风向', humidity VARCHAR(20) COMMENT '湿度', collect_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '采集时间' ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT '天气分片数据表';3.4 定时爬虫入库逻辑改造
python
运行
# 生成当日分片表 base_name = "weather_data" split_table = TableSplitUtil.get_split_table_name(base_name) create_sql_template = "上面建表语句" TableSplitUtil.create_table_if_not_exist(split_table, create_sql_template) # 动态替换入库表名 insert_sql = f"INSERT INTO {split_table}(city_name,temperature,weather,wind_direction,humidity) VALUES(%s,%s,%s,%s,%s)"四、全项目改造优化清单
表格
| 项目名称 | 优化改造内容 | 落地收益 |
|---|---|---|
| 基础异常请求爬虫 | ThreadPoolExecutor 重构并发测试 + Redis 临时 URL 去重 | 测试重复链接自动跳过,并发代码精简 60% |
| 开源文库文摘爬虫 | 标准线程池批量抓取 + Redis 永久指纹去重 + 日期分片入库 | 批量抓取效率稳定,重启不重复抓取,数据表分日归档 |
| 天气 API 定时爬虫 | 分片表存储 + Redis 去重当日重复采集 | 单表数据不会无限膨胀,规避定时重复采集同一时段数据 |
| 百科词条爬虫 | 线程池多词条并发 + Redis 内容指纹去重 | 批量词条高效抓取,剔除内容重复百科条目 |
| 技术社区爬虫 | 分页并发标准化改造 + Redis 分页 URL 去重 | 重复页码自动跳过,多轮采集无冗余数据 |
五、工程拓展与运维优化方向
5.1 后续进阶拓展
- Redis 设置 Key 过期时间,实现短期临时去重,适配需要隔日重复采集的天气类项目;
- 使用 Redis Hash 结构存储爬虫采集统计数据(采集总量、失败数量);
- 基于 Apscheduler 配合分片表,每月自动执行历史数据表备份脚本。
5.2 运维合规提醒
Redis 仅做采集标识存储,不缓存爬取的原始业务数据;分片存储仍需遵循爬虫合规规范,不可通过并发与代理高频越权抓取非公开数据,定时任务配置合理采集间隔,规避对目标接口与服务器造成负载压力。
