当前位置: 首页 > news >正文

用Python复现通达信Winner函数:手把手教你估算A股筹码分布与获利盘比例

用Python构建A股筹码分布分析引擎:从Winner函数原理到量化实战

在传统股票分析软件中,Winner函数一直是技术派投资者研判市场情绪的重要工具。这个看似简单的指标背后,隐藏着对市场微观结构的深刻洞察——通过估算不同价格区间的持仓分布,我们可以量化当前市场的盈亏状态,为买卖决策提供数据支撑。本文将带您从零构建一个Python版的筹码分布分析系统,不仅复现Winner函数的核心逻辑,更会深入探讨如何优化算法性能、处理真实市场数据的各种"坑",以及将这个工具整合到量化研究流程中。

1. 理解筹码分布与Winner函数的数学本质

筹码分布理论的核心假设是:股票的流通筹码会随着交易不断换手,新买入的投资者成本价会取代部分旧持仓。这种迭代过程可以用马尔可夫链来描述——每个交易日的筹码分布只依赖于前一天的分布和当日的交易数据。

关键变量解析

  • 换手率:当日成交量占流通股本的比例,决定多少旧筹码被替换
  • 成交均价:新入场筹码的成本价
  • 累积留存因子:筹码未被换手的概率乘积

让我们用一个简化案例说明这个动态过程。假设某股票流通盘为1亿股,连续三天的交易数据如下:

交易日换手率成交均价(元)10元筹码留存11元新增筹码12元新增筹码
第1天-10.0100%--
第2天20%11.080%20%-
第3天30%12.056%14%30%

对应的Python计算逻辑可以表示为:

def update_holdings(previous, turnover, new_price): """ previous: 前一天的筹码分布字典 {价格:比例} turnover: 当日换手率(0-1) new_price: 当日成交均价 """ updated = {price: ratio*(1-turnover) for price, ratio in previous.items()} updated[new_price] = updated.get(new_price, 0) + turnover return updated

2. 构建工业级Winner函数实现

实际应用中我们需要处理更多复杂情况:停牌日数据处理、复权价格调整、流通股本变动等。下面是一个增强版的实现方案:

import pandas as pd from typing import Dict, List class ChipDistributionAnalyzer: def __init__(self, stock_code: str): self.stock_code = stock_code self.history_data = None def load_market_data(self, start_date: str, end_date: str) -> None: """从数据源加载必要的市场数据""" # 示例使用AKShare获取数据,实际可替换为Tushare或其他源 import akshare as ak df = ak.stock_zh_a_daily(symbol=self.stock_code, start_date=start_date, end_date=end_date, adjust="hfq") # 后复权 # 计算换手率(需注意不同数据源的计算方式可能不同) df['turnover'] = df['volume'] / (df['outstanding_share'] * 10000) # 计算成交均价(金额/成交量) df['avg_price'] = df['amount'] / (df['volume'] * 100) self.history_data = df[['date', 'close', 'avg_price', 'turnover']] def calculate_chip_distribution(self) -> Dict[str, Dict[float, float]]: """计算历史筹码分布""" if self.history_data is None: raise ValueError("请先加载市场数据") chip_history = {} current_dist = {} for idx, row in self.history_data.iterrows(): date = row['date'].strftime('%Y-%m-%d') turnover = row['turnover'] avg_price = round(row['avg_price'], 2) if not current_dist: # 首日处理 current_dist = {avg_price: 1.0} else: current_dist = { price: ratio*(1-turnover) for price, ratio in current_dist.items() } current_dist[avg_price] = current_dist.get(avg_price, 0) + turnover # 移除微小比例(优化内存) current_dist = { p: r for p, r in current_dist.items() if r > 0.0001 } chip_history[date] = current_dist.copy() return chip_history def calculate_winner_ratio(self, chip_data: Dict[str, Dict[float, float]]) -> pd.Series: """计算每日收盘获利比例""" ratios = [] dates = [] for date, dist in chip_data.items(): close_price = self.history_data[self.history_data['date']==date]['close'].values[0] profitable = sum(ratio for price, ratio in dist.items() if price < close_price) ratios.append(profitable) dates.append(date) return pd.Series(ratios, index=pd.to_datetime(dates), name='winner_ratio')

关键优化点

  1. 使用面向对象封装,便于维护和扩展
  2. 支持多种数据源接入(通过替换load_market_data实现)
  3. 自动处理复权价格,确保长期计算的准确性
  4. 动态清理微小比例筹码,优化内存使用

3. 不同数据源的对比与实战问题排查

在实际应用中,数据质量直接影响分析结果的可靠性。我们对比了主流数据源的差异:

数据项Tushare ProAKShare本地数据库
换手率计算成交量/流通股本需自行计算可自定义公式
复权处理提供多种复权选项仅后复权依赖本地处理
历史数据深度依赖套餐权限相对完整取决于采集范围
更新频率15分钟级延迟日级更新实时可控
异常值处理相对规范需自行校验可自定义规则

常见问题排查指南

  1. 换手率异常高

    • 检查流通股本数据是否准确(特别是除权除息日后)
    • 验证成交量单位是否一致(手 vs 股)
    • 示例校验代码:
    def validate_turnover(df): """验证换手率合理性""" suspicious = df[df['turnover'] > 0.5] # 单日换手率超过50%视为异常 if not suspicious.empty: print(f"发现异常换手率日期: {suspicious.index.tolist()}") # 可添加自动修正逻辑或人工确认
  2. 价格跳跃问题

    • 确认是否正确处理了复权(特别是前复权与后复权的区别)
    • 检查停牌日数据是否被错误包含
    • 示例复权处理:
    def adjust_price(raw_df, adjust_type='hfq'): """统一复权处理""" if adjust_type == 'hfq': return raw_df['close'] * raw_df['hfq_factor'] elif adjust_type == 'qfq': return raw_df['close'] * raw_df['qfq_factor'] else: return raw_df['close']
  3. 内存消耗过大

    • 对历史筹码分布进行定期采样(如只保留月末数据)
    • 使用更高效的数据结构(如numpy数组替代字典)
    • 设置筹码比例阈值(如小于0.01%的忽略)

4. 高级应用:筹码分布的多维度分析

基础的Winner函数只能告诉我们当前有多少筹码获利,但结合完整筹码分布数据,我们可以开发更强大的分析工具:

4.1 成本密集区识别

def find_cost_concentration(chip_data: Dict[float, float], bandwidth=0.05) -> List[Tuple[float, float]]: """ 使用核密度估计找出成本密集区 bandwidth: 核密度估计的带宽参数 返回: [(价格1,密度1), (价格2,密度2)...] """ from sklearn.neighbors import KernelDensity import numpy as np prices = [] weights = [] for price, ratio in chip_data.items(): prices.append([price]) weights.append(ratio) prices = np.array(prices) weights = np.array(weights) kde = KernelDensity(bandwidth=bandwidth, kernel='gaussian') kde.fit(prices, sample_weight=weights) x_grid = np.linspace(min(prices)[0], max(prices)[0], 100) log_dens = kde.score_samples(x_grid.reshape(-1, 1)) return list(zip(x_grid, np.exp(log_dens)))

4.2 压力/支撑位分析

通过识别历史筹码密集区,结合当前价格位置,可以判断潜在的压力支撑位:

def analyze_support_resistance(current_price: float, chip_density: List[Tuple[float, float]], threshold=0.8) -> Dict: """ 分析当前价格相对于筹码分布的位置 返回: { 'support': [支撑位列表], 'resistance': [压力位列表], 'position': 当前处于筹码分布的位置百分比 } """ sorted_density = sorted(chip_density, key=lambda x: x[0]) prices = [x[0] for x in sorted_density] densities = [x[1] for x in sorted_density] # 计算累积分布函数 total = sum(densities) cdf = np.cumsum(densities) / total # 找出高于阈值的密集区 peaks = [] in_peak = False for i, (price, density) in enumerate(sorted_density): if density > np.percentile(densities, 80): if not in_peak: peak_start = price in_peak = True else: if in_peak: peak_end = sorted_density[i-1][0] peaks.append((peak_start, peak_end)) in_peak = False # 分类压力支撑位 result = {'support': [], 'resistance': []} current_pos = np.interp(current_price, prices, cdf) for start, end in peaks: if end < current_price: result['support'].append((start, end)) elif start > current_price: result['resistance'].append((start, end)) result['position'] = current_pos return result

4.3 筹码分布可视化

使用Matplotlib创建专业的筹码分布图:

def plot_chip_distribution(chip_data: Dict[float, float], current_price: float = None, ax=None): """绘制筹码分布图""" import matplotlib.pyplot as plt if ax is None: fig, ax = plt.subplots(figsize=(10, 6)) prices = sorted(chip_data.keys()) ratios = [chip_data[p] for p in prices] ax.bar(prices, ratios, width=0.02, alpha=0.7, color='steelblue') ax.set_xlabel('Price') ax.set_ylabel('Chip Ratio') ax.set_title('Chip Distribution') if current_price is not None: ax.axvline(x=current_price, color='red', linestyle='--', label=f'Current Price: {current_price:.2f}') ax.legend() # 添加成本密集区标记 density = find_cost_concentration(chip_data) dens_prices = [x[0] for x in density] dens_values = [x[1] for x in density] ax2 = ax.twinx() ax2.plot(dens_prices, dens_values, color='darkorange', linewidth=2) ax2.set_ylabel('Density', color='darkorange') return ax

5. 性能优化与批量处理策略

当需要分析多只股票或长时间序列时,原始算法的计算效率会成为瓶颈。以下是几种优化方案:

5.1 向量化计算

使用NumPy的向量运算替代循环:

def vectorized_chip_update(prev_dist: np.ndarray, prev_prices: np.ndarray, turnover: float, new_price: float) -> Tuple[np.ndarray, np.ndarray]: """ 向量化更新筹码分布 prev_dist: 前一天的各价格筹码比例数组 prev_prices: 对应的价格数组 turnover: 当日换手率 new_price: 当日成交均价 返回: (更新后的价格数组, 更新后的分布数组) """ # 留存筹码 new_dist = prev_dist * (1 - turnover) # 处理新价格 if new_price in prev_prices: idx = np.where(prev_prices == new_price)[0][0] new_dist[idx] += turnover else: # 插入新价格 insert_pos = np.searchsorted(prev_prices, new_price) prev_prices = np.insert(prev_prices, insert_pos, new_price) new_dist = np.insert(new_dist, insert_pos, turnover) # 移除微小比例 mask = new_dist > 1e-5 return prev_prices[mask], new_dist[mask]

5.2 多进程处理

对于股票组合分析,使用Python的multiprocessing模块:

from multiprocessing import Pool def analyze_stock(stock_code): analyzer = ChipDistributionAnalyzer(stock_code) analyzer.load_market_data('20200101', '20231231') chip_data = analyzer.calculate_chip_distribution() return stock_code, chip_data def batch_analyze(stock_list, workers=4): """批量分析股票列表""" with Pool(workers) as p: results = p.map(analyze_stock, stock_list) return dict(results)

5.3 记忆化与缓存

使用joblib缓存中间结果,避免重复计算:

from joblib import Memory memory = Memory('./cache_dir', verbose=0) @memory.cache def get_chip_distribution(stock_code, start_date, end_date): analyzer = ChipDistributionAnalyzer(stock_code) analyzer.load_market_data(start_date, end_date) return analyzer.calculate_chip_distribution()

6. 整合到量化研究流程

将筹码分析工具整合到完整的量化研究框架中:

class QuantitativeResearch: def __init__(self): self.data_provider = None self.strategy = None self.portfolio = {} def add_strategy(self, strategy_func): """添加交易策略""" self.strategy = strategy_func def run_backtest(self, start_date, end_date, stock_pool): """运行回测""" results = [] for code in stock_pool: # 获取筹码数据 chip_data = get_chip_distribution(code, start_date, end_date) # 获取价格数据 price_data = self.data_provider.get_price_data(code, start_date, end_date) # 应用策略 signals = self.strategy(price_data, chip_data) # 记录结果 results.append({ 'code': code, 'signals': signals, 'performance': self.calculate_performance(signals, price_data) }) return results def strategy_example(self, price_data, chip_data): """示例策略:基于筹码分布的交易信号""" signals = [] winner_ratios = [] for date, row in price_data.iterrows(): chip = chip_data.get(date.strftime('%Y-%m-%d'), {}) if not chip: signals.append(0) continue close = row['close'] winner_ratio = sum(r for p, r in chip.items() if p < close) winner_ratios.append(winner_ratio) # 简单策略:当获利盘比例低于30%时买入,高于70%时卖出 if winner_ratio < 0.3: signals.append(1) # 买入 elif winner_ratio > 0.7: signals.append(-1) # 卖出 else: signals.append(0) # 持有 price_data['winner_ratio'] = winner_ratios price_data['signal'] = signals return price_data

在实际项目中,我们可以将筹码分析与其他技术指标(如均线、MACD等)结合,构建更复杂的交易策略。一个常见的做法是设置筹码分布的条件筛选:

def screen_stocks_by_chips(stock_list, date, conditions): """ 根据筹码条件筛选股票 conditions示例: { 'winner_ratio': (0.2, 0.4), # 获利盘比例在20%-40%之间 'cost_concentration': 0.3, # 最大成本密集区占比超过30% 'price_position': 0.7 # 当前价格位于筹码分布70%以上位置 } """ qualified = [] for code in stock_list: try: chip_data = get_chip_distribution(code, date, date)[date] analysis = analyze_support_resistance( current_price=get_current_price(code, date), chip_density=find_cost_concentration(chip_data) ) # 检查条件 pass_all = True if 'winner_ratio' in conditions: wr = sum(r for p, r in chip_data.items() if p < get_current_price(code, date)) low, high = conditions['winner_ratio'] if not (low <= wr <= high): pass_all = False if pass_all and 'cost_concentration' in conditions: max_density = max(d for p, d in find_cost_concentration(chip_data)) if max_density < conditions['cost_concentration']: pass_all = False if pass_all and 'price_position' in conditions: if analysis['position'] < conditions['price_position']: pass_all = False if pass_all: qualified.append(code) except Exception as e: print(f"Error processing {code}: {str(e)}") continue return qualified
http://www.jsqmd.com/news/958738/

相关文章:

  • 法律文书智能生成系统上线实录(从试点到全所推广仅47天)
  • 从‘过零点’到‘比特流’:手把手教你用Python仿真复现FSK软件解调全过程(含信号可视化)
  • PyTorch版DnCNN盲去噪完整工程:含训练脚本、测试流程、预训练权重与逐行中文注释
  • 【企业AI工具选型生死线】:从需求映射、数据兼容性到LLM微调支持度——一份被19家 Fortune 500 保密采用的评估矩阵
  • 手把手教你用STM32F103和ESP8266做一个桌面天气时钟(附完整代码和接线图)
  • 成都危险品物流仓储核心技术规范与合规实操指南:成都危险品物流仓储/成都危险品贮存/成都危险货物危险品仓库/危险化学品储存/选择指南 - 优质品牌商家
  • RAID磁盘阵列原理、各级别对比、实战搭建详解
  • 鸿蒙ArkUI实战:步骤表单与进度指示器
  • 免费解锁Wand专业版:终极完整指南与远程控制教程
  • GBase 8s数据库的四种武器之一,图形化管理平台GEM解析
  • 数据预处理实战:分层防御架构与缺失/异常值决策树
  • 如何挑选真正实力派的GEO公司?指南分享
  • 别再手动画图了!用VSCode+PlantUML插件5分钟搞定UML类图(附完整语法速查表)
  • 非参数核聚类与老虎机反馈:理论与应用解析
  • STM32项目从Keil迁移到System Workbench全记录:工程配置、库管理与调试避坑指南
  • 2026年汽车电线线选型评测:储能线线缆、充电桩线缆、新能源电缆、机器人拖链线缆、汽车电线线、逆变器线缆、风能线缆选择指南 - 优质品牌商家
  • 从‘大泥球’到‘乐高积木’:聊聊我们团队踩过的架构坑与Service Mesh救赎之路
  • 实战演练,基于快马平台jdk17环境快速搭建restful api微服务
  • 2026年口碑好的装饰设计专业公司排名,靠谱的品牌推荐 - 工业品牌热点
  • ollama v0.30.5 更新:Hermes Desktop 上线、Windows 安装优化、Gemma4 崩溃修复、Cline CLI 集成文档全量补齐
  • Linux 服务器性能优化基础(CPU/内存/磁盘/网络)
  • 从DAG到值编码:图解编译原理龙书第六章核心概念,手把手教你搞定表达式优化
  • AD9851对比AD9850实战:6倍频到底香不香?实测70MHz+信号生成心得
  • 基于STM32与AD9851的双通道可编程波形发生器,支持基波+5次谐波叠加及三种基础波形输出
  • 技术演进:BepInEx Unity插件框架架构转型与IL2CPP运行时稳定性突破
  • 告别NTP服务器:手把手教你用ESP8266+STM32F103从零搭建一个离线/在线双模天气时钟(附完整代码)
  • 企业AI落地踩坑复盘:只做RAG走不远,ReAct补齐短板
  • 2026年Q2嘉兴奢侈品回收实测:嘉兴名鉴钟表有限公司联系/嘉兴首饰回收/嘉兴奢侈品回收/嘉兴工艺美术品回收/嘉兴黄金回收/选择指南 - 优质品牌商家
  • Linux 下 gcc / g++ 编译过程详解:从编译到链接
  • 实战指南:基于快马ai为django项目生成wsl2一体化开发环境配置脚本