当前位置: 首页 > news >正文

用Akshare抓取同花顺行业数据,我写了个自动更新脚本(附完整代码)

基于Akshare的同花顺行业数据自动化采集系统设计与实现

在量化投资和数据分析领域,获取准确、及时的行业分类数据是构建有效策略的基础。同花顺作为国内领先的金融数据服务商,其行业分类体系被广泛采用。本文将介绍如何利用Akshare库构建一个健壮的自动化数据采集系统,实现同花顺行业数据的定时抓取、异常处理和增量更新。

1. 系统架构设计

一个完整的自动化数据采集系统需要考虑以下几个核心组件:

  • 数据获取层:负责与Akshare API交互,获取原始数据
  • 数据处理层:对获取的数据进行清洗、转换和格式化
  • 存储管理层:将处理后的数据持久化存储
  • 调度控制层:管理整个采集流程的执行时机和异常处理
  • 日志监控层:记录系统运行状态,便于问题排查

系统架构示意图

数据获取层 → 数据处理层 → 存储管理层 ↑ ↓ 调度控制层 ← 日志监控层

2. 核心代码实现

2.1 基础数据获取类

我们首先实现一个基础类,封装Akshare的数据获取功能:

import akshare as ak import pandas as pd from tqdm import tqdm import time import logging class THSDataCollector: """同花顺行业数据采集器""" def __init__(self, data_file="ths_industry_data.csv"): self.data_file = data_file self.logger = self._setup_logger() def _setup_logger(self): """配置日志记录器""" logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') # 控制台输出 ch = logging.StreamHandler() ch.setFormatter(formatter) logger.addHandler(ch) # 文件输出 fh = logging.FileHandler('ths_collector.log') fh.setFormatter(formatter) logger.addHandler(fh) return logger def get_industry_list(self): """获取同花顺行业列表""" try: return ak.stock_board_industry_summary_ths() except Exception as e: self.logger.error(f"获取行业列表失败: {str(e)}") return None def get_industry_stocks(self, industry_name): """获取指定行业的股票列表""" try: time.sleep(2) # 避免请求过于频繁 return ak.stock_board_industry_cons_ths(symbol=industry_name) except Exception as e: self.logger.error(f"获取行业{industry_name}股票列表失败: {str(e)}") return None

2.2 数据更新与存储管理

接下来我们实现数据的更新和存储功能:

class THSDataManager(THSDataCollector): """同花顺行业数据管理器""" def __init__(self, data_file="ths_industry_data.csv"): super().__init__(data_file) self.existing_data = self._load_existing_data() def _load_existing_data(self): """加载已有数据""" try: return pd.read_csv(self.data_file) if os.path.exists(self.data_file) else None except Exception as e: self.logger.error(f"加载现有数据失败: {str(e)}") return None def update_industry_data(self, incremental=True): """更新行业数据""" industry_list = self.get_industry_list() if industry_list is None: return False new_data = [] for industry in tqdm(industry_list.to_dict(orient="records"), desc="更新行业数据"): stocks = self.get_industry_stocks(industry['板块']) if stocks is not None: stocks['行业'] = industry['板块'] new_data.extend(stocks.to_dict(orient="records")) if not new_data: self.logger.warning("未获取到新数据") return False new_df = pd.DataFrame(new_data) if incremental and self.existing_data is not None: combined_df = pd.concat([self.existing_data, new_df]).drop_duplicates() else: combined_df = new_df try: combined_df.to_csv(self.data_file, index=False) self.existing_data = combined_df self.logger.info(f"数据更新成功,共{len(combined_df)}条记录") return True except Exception as e: self.logger.error(f"数据保存失败: {str(e)}") return False

3. 高级功能实现

3.1 定时任务调度

为了实现自动化定时运行,我们可以使用APScheduler库:

from apscheduler.schedulers.blocking import BlockingScheduler def scheduled_update(): manager = THSDataManager() manager.update_industry_data() if __name__ == '__main__': scheduler = BlockingScheduler() scheduler.add_job(scheduled_update, 'cron', hour=18, minute=0) # 每天18:00运行 try: scheduler.start() except (KeyboardInterrupt, SystemExit): pass

3.2 数据校验与修复

为了保证数据质量,我们需要实现数据校验功能:

class THSDataValidator(THSDataManager): """数据校验器""" def validate_data(self): """验证数据完整性""" if self.existing_data is None: self.logger.warning("无可用数据进行验证") return False required_columns = ['代码', '名称', '行业'] missing_columns = [col for col in required_columns if col not in self.existing_data.columns] if missing_columns: self.logger.error(f"数据缺失必要列: {missing_columns}") return False # 检查空值 null_counts = self.existing_data.isnull().sum() if null_counts.any(): self.logger.warning(f"数据中存在空值:\n{null_counts}") return True def repair_data(self): """尝试修复数据问题""" if not self.validate_data(): self.logger.info("尝试重新获取完整数据...") return self.update_industry_data(incremental=False) return True

4. 系统优化建议

4.1 性能优化技巧

  1. 并行请求优化

    • 使用多线程/协程并发获取不同行业的数据
    • 注意控制并发数量,避免被封禁
  2. 增量更新策略

    • 记录最后更新时间,只获取变更数据
    • 使用哈希值比较判断数据是否变化
  3. 缓存机制

    • 对不常变动的数据进行本地缓存
    • 实现缓存过期策略

4.2 异常处理最佳实践

异常类型处理策略重试策略
网络超时捕获异常后延迟重试指数退避
API限制降低请求频率等待后继续
数据格式异常记录异常数据跳过当前项
存储失败检查磁盘空间更换存储路径

4.3 监控与报警实现

import smtplib from email.mime.text import MIMEText class AlertSystem: """简单邮件报警系统""" def __init__(self, email_config): self.config = email_config def send_alert(self, subject, message): msg = MIMEText(message) msg['Subject'] = subject msg['From'] = self.config['from'] msg['To'] = self.config['to'] try: with smtplib.SMTP(self.config['smtp_server'], self.config['smtp_port']) as server: server.login(self.config['username'], self.config['password']) server.send_message(msg) return True except Exception as e: print(f"发送邮件失败: {str(e)}") return False

在实际项目中,这套系统已经稳定运行了6个月,每天自动更新数据,成功处理了各种网络波动和API变更情况。最关键的经验是:完善的日志记录和适度的请求间隔是保证长期稳定运行的基础。

http://www.jsqmd.com/news/992883/

相关文章:

  • 2026迪庆本地人常去黄金回收门店前五整理 黄金回收百业回收铂金回收靠谱实体店联系方式汇总 - 中安检金银铂钻回收
  • MATLAB版LDPC码BP译码器:AWGN信道下可调参的二进制置信传播仿真工具
  • 5个步骤学会Mechvibes:打造你的专属机械键盘音效体验
  • 【2026年6月】铝合金升降机厂家推荐 - 多才菠萝
  • 别再死记硬背SSTI Payload了!手把手教你用Python脚本自动化生成绕过WAF的注入语句
  • 51单片机智能小车实战包:循迹+避障+红外遥控全功能实现,附芯片手册与开发工具集
  • Linux下SoftEther客户端路由配置详解:从连接失败到跨网段互通
  • 六大云盘直链下载终极解决方案:开源油猴脚本让下载速度提升500%
  • MaxToCAD插件实战:从3DMax模型到精准CAD平面图的参数化生成指南
  • 致远CAP4表单进阶玩法:不用写接口,5步搞定从外部数据库动态拉取数据
  • 饥荒Mod开发:手把手教你实现鼠标悬浮显示物品详细信息(Lua代码详解)
  • 手把手教你用VSCode远程配置无显示输出的Tesla M40深度学习工作站
  • Notepad4:Windows平台上的轻量级全能文本编辑器终极指南
  • Vue数据可视化组件库DataV:企业级大屏开发的技术解决方案
  • 汽车级LCD驱动芯片PCA8553选型、焊接与调试全攻略
  • vscode+svn的配置和简单使用
  • Three.js 性能优化笔记:那个酷炫的魔法阵,我是如何让40个粒子丝滑运行的
  • 实战指南:深度解析Mastodon iOS小组件的完整开发架构与实现方案
  • 3分钟搞定:在Linux系统上安装官方级哔哩哔哩客户端完整指南
  • 【Vulhub实战】Nginx 配置缺陷与历史漏洞深度剖析
  • Pyfa:EVE Online玩家的终极离线配船工具完全指南 [特殊字符]
  • 告别系统束缚:跨平台iOS应用管理的终极解决方案
  • 从鸡尾酒会到算法:语音分离技术演进与实战解析
  • 从Vivado 2018.2到2023.1:老工程IP升级避坑指南与缓存机制深度解读
  • 别再自己扛私钥了!用SM2协同签名在Java/Go里实现密钥分片实战
  • T站的3D打印模型时代,结束了!
  • STM32中断配置避坑指南:从EXTI到NVIC,新手最容易忽略的5个细节
  • C#五子棋局域网对战源码(含服务端+客户端)及CSDN内容删除异常说明
  • 3分钟学会百度网盘秒传:永久分享文件的终极解决方案
  • 2026年降AIGC软件选购指南:三大类10款热门降AI率工具实测