别再手动抄代码了!用Python+efinance批量抓取A股全量数据(附完整脚本与MongoDB存储方案)
Python量化实战:用efinance构建A股全量数据库的工程化实践
在量化投资领域,数据是策略开发的基石。传统的手动下载和整理股票数据不仅效率低下,还容易出错。我曾见过一位量化研究员因为手动复制粘贴错了一行数据,导致回测结果完全失真——这个教训价值百万。本文将分享如何用Python的efinance库构建自动化数据管道,从零开始搭建本地A股数据库。
1. 环境准备与工具链搭建
1.1 核心工具选型
构建稳定可靠的数据采集系统需要精心挑选工具链。经过多次实践验证,我确定了以下技术组合:
- efinance:非官方但稳定的金融数据接口,相比Tushare更轻量
- MongoDB:文档型数据库,天然适合存储非结构化的行情数据
- Loguru:比标准logging更友好的日志工具
- Tqdm:进度条可视化,让长时间运行的任务不再"盲跑"
安装这些工具只需一行命令:
pip install efinance loguru tqdm pymongo提示:建议使用Python 3.8+环境,某些库在新版本Python中可能有兼容性问题
1.2 工程化目录结构
好的项目结构能大幅降低维护成本。这是我常用的目录布局:
stock_data_pipeline/ ├── config/ # 配置文件 ├── logs/ # 日志文件 ├── src/ # 源代码 │ ├── crawler.py # 数据采集主程序 │ └── utils.py # 工具函数 └── requirements.txt # 依赖清单2. 高效数据采集策略
2.1 股票代码批量获取
efinance的get_realtime_quotes()能获取全部活跃股票列表,但实际使用中我发现几个优化点:
import efinance as ef def fetch_stock_codes(): """获取全量股票代码并去重""" df = ef.stock.get_realtime_quotes() # 过滤掉B股和新三板 codes = df[ (df['市场类型'] == '沪深A股') & (~df['股票代码'].str.startswith('8')) ]['股票代码'].unique() return codes.tolist()2.2 历史行情数据下载
直接循环请求容易被封IP,需要加入以下防护措施:
- 随机延时(2-5秒)
- 自动重试机制
- 异常捕获与日志记录
from loguru import logger from time import sleep import random def safe_fetch_history(stock_code, retry=3): """带防护措施的历史数据下载""" for i in range(retry): try: df = ef.stock.get_quote_history(stock_code) return df except Exception as e: logger.error(f"{stock_code} 第{i+1}次失败: {str(e)}") sleep(random.uniform(2, 5)) return None3. MongoDB存储优化方案
3.1 批量写入提升性能
逐条插入是MongoDB的性能杀手。这是我优化后的批量写入方案:
from pymongo import UpdateOne def bulk_upsert(collection, data_list): """批量更新插入操作""" operations = [ UpdateOne( {'_id': item['_id']}, {'$set': item}, upsert=True ) for item in data_list ] if operations: collection.bulk_write(operations)3.2 数据模型设计
合理的文档结构能提升查询效率。我的设计原则是:
- 以股票代码作为
_id主键 - 将K线数据按日期嵌套存储
- 添加元数据方便检索
示例文档结构:
{ "_id": "600519", "name": "贵州茅台", "industry": "酿酒", "daily_data": { "2023-01-04": { "open": 1800.0, "close": 1820.5, "volume": 32500 } } }4. 实战:构建完整数据管道
4.1 主程序架构
将各个模块组合成完整工作流:
from tqdm import tqdm def run_pipeline(): codes = fetch_stock_codes() client = pymongo.MongoClient() db = client['stock_db'] with tqdm(codes) as pbar: for code in pbar: pbar.set_description(f"处理 {code}") data = safe_fetch_history(code) if data is not None: processed = process_data(code, data) bulk_upsert(db.daily, [processed]) sleep(random.uniform(1, 3))4.2 异常处理与监控
添加以下保障措施确保长时间运行稳定:
- 断点续传:记录已处理的股票代码
- 内存监控:定期检查内存使用情况
- 心跳检测:每100只股票输出一次状态报告
import psutil def memory_guard(threshold=0.9): """内存保护机制""" if psutil.virtual_memory().percent > threshold: logger.warning("内存使用过高,暂停处理") sleep(60)5. 进阶优化技巧
5.1 分布式采集方案
当需要采集全市场多年数据时,单机可能需数天时间。可以考虑:
- 按股票代码分段,多进程处理
- 使用Redis作为任务队列
- 云函数动态扩展采集节点
from multiprocessing import Pool def distributed_crawl(): codes = fetch_stock_codes() with Pool(processes=4) as pool: pool.map(process_stock, codes)5.2 数据质量检查
采集完成后应进行完整性验证:
- 检查每只股票的数据量是否合理
- 验证关键字段无空值
- 对比最新数据与公开源是否一致
def validate_data(db): """数据质量检查""" problematic = [] for code in db.daily.distinct("_id"): count = db.daily.count_documents({"_id": code}) if count < 100: # 假设正常股票至少有100个交易日数据 problematic.append(code) return problematic在实际项目中,这套系统帮我节省了数百小时的手工操作时间。最关键的体会是:不要追求一次性完美,先构建最小可行方案,再逐步迭代优化。比如最初可以只采集收盘价,等管道稳定后再扩展其他字段。
