Python+AKShare实战:5分钟搭建LOF基金溢价监控系统(附微信推送配置)
Python+AKShare实战:5分钟搭建LOF基金溢价监控系统(附微信推送配置)
对于金融科技爱好者和量化投资新手来说,LOF基金的溢价套利是一个值得关注的机会。但手动监控溢价率、筛选符合条件的基金既耗时又容易错过最佳时机。本文将带你用Python和AKShare快速搭建一个自动化监控系统,实现从数据获取到微信通知的完整闭环。
1. 环境准备与依赖安装
在开始之前,确保你的Python环境已经就绪。推荐使用Python 3.7或更高版本。我们将使用以下核心库:
pip install akshare pandas requestsAKShare是一个免费、开源的金融数据接口库,特别适合个人开发者和小型团队使用。它提供了丰富的金融数据接口,包括基金、股票、期货等市场数据。
提示:如果你在国内访问AKShare数据源较慢,可以尝试使用镜像源安装依赖:
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple akshare pandas requests
2. 获取LOF基金基础数据
首先,我们需要获取所有LOF基金的基本信息。AKShare提供了fund_purchase_em()接口来获取基金申购信息:
import akshare as ak def get_lof_funds(): # 获取所有基金的申购信息 fund_df = ak.fund_purchase_em() # 筛选LOF基金并排除ETF lof_df = fund_df[~fund_df['基金代码'].str.startswith('0')] lof_df = lof_df[~lof_df['基金简称'].str.contains('ETF')] # 添加市场前缀(上海/深圳) lof_df['基金代码'] = lof_df['基金代码'].map(lambda x: 'SH'+x if x.startswith('5') else 'SZ'+x) return lof_df这个函数会返回一个包含所有LOF基金信息的DataFrame,包括基金代码、简称、类型、申购状态等关键信息。
3. 获取实时溢价率数据
获取基金的基础信息后,我们需要获取每只LOF基金的实时溢价率。这里我们可以通过雪球网的接口来获取:
import requests def get_premium_rate(stock_code): headers = { 'User-Agent': 'Mozilla/5.0', 'Referer': 'https://xueqiu.com/' } params = { 'symbol': stock_code, 'extend': 'detail' } try: response = requests.get( 'https://stock.xueqiu.com/v5/stock/quote.json', params=params, headers=headers, timeout=5 ) data = response.json() return float(data['data']['quote']['premium_rate']) / 100 # 转换为小数形式 except Exception as e: print(f"获取溢价率失败: {e}") return None4. 构建监控核心逻辑
现在我们将前两步的功能整合起来,构建完整的监控逻辑:
def monitor_lof_premium(): # 获取LOF基金列表 lof_df = get_lof_funds() # 只关注限购的QDII基金 target_df = lof_df[ (lof_df['申购状态'] == '限大额') & (lof_df['基金类型'].str.contains('QDII|海外')) ] results = [] for _, row in target_df.iterrows(): code = row['基金代码'] rate = get_premium_rate(code) if rate is not None: results.append({ '基金代码': code, '基金简称': row['基金简称'], '基金类型': row['基金类型'], '日累计限额': row['日累计限定金额'], '溢价率': rate }) return pd.DataFrame(results)这个函数会返回一个包含限购QDII LOF基金及其溢价率的DataFrame。你可以根据需要调整筛选条件,比如只关注溢价率超过某个阈值的基金。
5. 微信消息推送配置
监控到符合条件的基金后,我们需要设置微信通知。这里我们使用企业微信的Webhook功能:
def send_wechat_message(content, webhook_url): data = { "msgtype": "text", "text": { "content": content, "mentioned_mobile_list": ["13800138000"] # 需要@的人的手机号 } } response = requests.post( webhook_url, json=data, headers={'Content-Type': 'application/json'} ) return response.status_code == 200要使用这个功能,你需要先在企微群聊中添加"群机器人",获取Webhook地址。将地址替换到下面的代码中:
def main(): df = monitor_lof_premium() # 筛选溢价率超过5%的基金 high_premium_df = df[df['溢价率'] > 0.05] if not high_premium_df.empty: message = "发现LOF基金溢价机会:\n" message += high_premium_df.to_string(index=False) # 替换为你的实际Webhook地址 webhook_url = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=your-key" send_wechat_message(message, webhook_url)6. 系统优化与定时运行
为了让系统持续运行,你可以设置定时任务。在Linux/Mac上可以使用crontab:
# 每天上午9:30和下午1:30各运行一次 30 9,13 * * * /usr/bin/python3 /path/to/your/script.py在Windows上可以使用任务计划程序。为了提高系统的稳定性,建议添加以下优化:
- 异常处理:网络请求添加重试机制 2.日志记录:记录每次运行的结果和异常 3.性能优化:缓存不变的基金基础信息 4.去重机制:避免重复发送相同机会的通知
import logging from datetime import datetime # 配置日志 logging.basicConfig( filename='lof_monitor.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) def main(): try: logging.info("开始执行LOF基金溢价监控") df = monitor_lof_premium() # 记录所有监控结果 logging.info(f"监控结果:\n{df.to_string()}") # 筛选并发送通知 high_premium_df = df[df['溢价率'] > 0.05] if not high_premium_df.empty: message = f"{datetime.now().strftime('%Y-%m-%d %H:%M')} 发现LOF基金溢价机会:\n" message += high_premium_df.to_string(index=False) webhook_url = "your_webhook_url" if send_wechat_message(message, webhook_url): logging.info("微信通知发送成功") else: logging.warning("微信通知发送失败") else: logging.info("未发现符合条件的溢价机会") except Exception as e: logging.error(f"监控执行失败: {str(e)}")7. 进阶功能扩展
基础功能实现后,你可以考虑添加以下进阶功能:
- 历史溢价率分析:记录历史数据,识别溢价率波动模式 2.多平台通知:增加邮件、短信等通知方式 3.交易信号生成:结合其他指标生成交易建议 4.可视化界面:使用Flask或Dash构建简单的前端界面
# 示例:历史数据存储与分析 import sqlite3 from datetime import datetime def save_to_database(df): conn = sqlite3.connect('lof_data.db') df['timestamp'] = datetime.now() df.to_sql('premium_records', conn, if_exists='append', index=False) conn.close() def analyze_historical_data(): conn = sqlite3.connect('lof_data.db') query = """ SELECT 基金代码, 基金简称, AVG(溢价率) as avg_premium, MAX(溢价率) as max_premium, COUNT(*) as records FROM premium_records GROUP BY 基金代码, 基金简称 ORDER BY avg_premium DESC """ result = pd.read_sql(query, conn) conn.close() return result这个系统虽然简单,但已经具备了核心功能。在实际使用中,我发现最关键的几点是:
- 确保数据源的稳定性,AKShare和雪球接口偶尔会有变动
- 合理设置监控频率,避免过于频繁请求导致IP被封
- 仔细计算交易成本,确保溢价率足够覆盖所有费用
- 对限购额度小的基金要保持谨慎,可能无法实现规模套利
