服饰流行周期计算程序,输入单品上线数据,预判款式衰退清仓最佳时间点。
服饰流行周期计算程序 —— 预判款式衰退与清仓最佳时间点
一、实际应用场景描述
在《时尚产业与品牌创新》课程中,服饰流行周期(Fashion Cycle) 是一个核心理论框架。从 "引入期 → 成长期 → 成熟期 → 衰退期 → 滞销期" 的完整生命周期,决定了品牌每一个关键决策的时间窗口:
阶段 典型表现 品牌动作
引入期 (Introduction) 首批 KOL 穿搭曝光,小红书/抖音种草开始 小批量试水,观察数据
成长期 (Growth) 搜索量攀升,竞品跟进,销量周环比 > 15% 追单补货,加大投放
成熟期 (Maturity) 销量 plateau,周环比 ±5% 波动 维持曝光,准备新品衔接
衰退期 (Decline) 销量周环比持续下滑 > 3 周,退货率上升 启动折扣,减少新流量投入
滞销期 (Obsolescence) 库存周转率 < 0.5,仓储成本 > 边际贡献 清仓退出,回收现金流
核心决策问题:一款衣服上架后,什么时候开始打折?什么时候必须清仓?晚一周可能多赚,但也多压一周库存;早一周可能损失利润。这个"最佳清仓时间点"就是本程序要解决的核心问题。
二、引入痛点
2.1 行业现状的"三拍决策"
痛点 真实表现 后果
拍脑袋定折扣 "卖了一个月了,该打折了吧" 要么打折太早损失毛利,要么太晚变死库存
看总量不看趋势 只看累计销量,不看周环比斜率 成熟期误判为衰退,或衰退期误判为季节性波动
统一折扣节奏 所有款一起 618 打折、双 11 打折 不同款的生命周期阶段完全不同,一刀切效率低
缺乏量化预警 等发现"卖不动了"已经晚了 库存积压 → 折价 70% 才能清掉 → 吞噬前期利润
不懂品类差异 连衣裙和羽绒服用同一套节奏 连衣裙衰退以周计,羽绒服以月计
2.2 一个典型损失场景
某独立品牌,一款 ¥899 的碎花连衣裙:
第 1~3 周:周销量 120 → 150 → 180(成长期,✅ 不打折)
第 4~6 周:周销量 185 → 178 → 170(成熟期,🟡 临界区)
第 7~9 周:周销量 152 → 130 → 105(衰退期,❌ 应该打 8 折)
但实际决策:等到第 10 周才打 7 折
结果:库存 200 件 × 折价损失 ¥180/件 = ¥36,000 利润蒸发
核心矛盾:不是"要不要打折",而是"在哪一周打折,能使总利润最大化"——这就是流行周期量化的价值。
三、核心逻辑讲解
3.1 流行周期曲线模型
本程序采用 修正的 Bass 扩散模型 + 指数衰减叠加,将单款销量拟合为:
周销量(t) = 基础扩散曲线(t) × 季节衰减因子(t) × 随机噪声(t)
其中:
基础扩散曲线(Bass 模型):
S(t) = p + (q − p) × F(t) − p × F(t)²
p = 创新系数(KOL/种草驱动,约 0.03~0.08)
q = 模仿系数(大众跟风驱动,约 0.15~0.40)
F(t) = 累积采纳比例
季节衰减因子(品类差异):
连衣裙:快衰减 λ = 0.12(生命周期 8~12 周)
羽绒服:慢衰减 λ = 0.05(生命周期 16~24 周)
T恤: 中衰减 λ = 0.08(生命周期 12~16 周)
衰减公式:e^(−λ × t)
3.2 衰退判定算法
程序通过三重判定确认衰退起点:
判定条件(需同时满足):
① 连续 N 周(默认 3 周)周环比 < 0
② 最近一周销量 < 峰值 × 衰退阈值(默认 70%)
③ 斜率(线性回归)显著为负(p < 0.05)
3.3 最佳清仓时间点算法
核心思路:比较"继续持有"vs"立即清仓"的期望利润
持有策略(再等 w 周):
E[利润(w)] = Σ[P(t) × (1−d(t)) × M] − C_hold × w − C_risk
其中:
P(t) = 第 t 周预测销量
d(t) = 第 t 周自然折扣率(季节性加深)
M = 每件边际贡献(售价 × 毛利率 − 单件可变成本)
C_hold = 周库存持有成本(仓储 + 资金占用 + 贬值)
C_risk = 滞销风险成本(概率 × 最终折价损失)
清仓策略(第 T 周立即打折):
E[利润(T)] = P(T) × (1−d_clear) × M − C_clear_cost
其中:
d_clear = 清仓折扣率(如 0.30 = 7 折)
C_clear_cost = 清仓执行成本(额外推广/直播坑位费)
最优清仓周 = argmax E[利润(w)] over all w
3.4 折扣策略建议
衰退期折扣阶梯(动态定价):
衰退早期(销量降至峰值 70%): 9 折(轻微刺激)
衰退中期(销量降至峰值 50%): 8 折(主动去库存)
衰退晚期(销量降至峰值 30%): 6.5~7 折(清仓价)
滞销期 (库存周转率 < 0.3): 5 折以下(回收现金流)
四、项目结构
fashion_cycle_tracker/
├── config.py # 品类参数配置(衰减率/周期阶段阈值)
├── data_models.py # 数据模型(单品/周销量/生命周期事件)
├── curve_fitter.py # 曲线拟合器(Bass模型 + 季节衰减)
├── decline_detector.py # 衰退检测器(三重判定算法)
├── clearance_optimizer.py # 清仓优化器(利润最大化)
├── discount_strategy.py # 折扣策略生成器
├── report.py # 报告生成(表格 + 可视化)
├── main.py # 主程序入口(含完整示例数据)
├── README.md # 项目说明
└── requirements.txt # 依赖声明
五、代码模块化实现
"requirements.txt"
numpy>=1.24.0
scipy>=1.10.0
matplotlib>=3.7.0
"config.py"
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
config.py
品类参数配置中心:不同服饰品类的生命周期特征参数
"""
from typing import Dict
import numpy as np
# ========== 品类生命周期参数 ==========
# 基于时尚产业研究的典型值
CATEGORY_PARAMS = {
# 连衣裙:春夏核心品类,生命周期短,衰减快
"dress": {
"bass_p": 0.06, # 创新系数(种草敏感度)
"bass_q": 0.35, # 模仿系数(跟风效应强)
"decay_lambda": 0.12, # 衰减速率(快)
"typical_weeks": 10, # 典型总生命周期(周)
"peak_week_range": (3, 6), # 峰值通常出现在第几周
"seasonal_strength": 0.8, # 季节性强度(高)
},
# 羽绒服:秋冬核心品类,生命周期长,衰减慢
"down_jacket": {
"bass_p": 0.04,
"bass_q": 0.25,
"decay_lambda": 0.05,
"typical_weeks": 20,
"peak_week_range": (5, 10),
"seasonal_strength": 0.9,
},
# T恤/基础款:全年可穿,生命周期中等
"tshirt": {
"bass_p": 0.05,
"bass_q": 0.30,
"decay_lambda": 0.08,
"typical_weeks": 14,
"peak_week_range": (4, 8),
"seasonal_strength": 0.4,
},
# 牛仔裤:经典款,生命周期最长
"jeans": {
"bass_p": 0.03,
"bass_q": 0.20,
"decay_lambda": 0.04,
"typical_weeks": 24,
"peak_week_range": (6, 12),
"seasonal_strength": 0.3,
},
# 运动服/瑜伽裤:功能驱动,衰减中等偏快
"activewear": {
"bass_p": 0.07,
"bass_q": 0.38,
"decay_lambda": 0.10,
"typical_weeks": 12,
"peak_week_range": (3, 7),
"seasonal_strength": 0.5,
},
}
# ========== 衰退判定阈值 ==========
DECLINE_THRESHOLDS = {
"min_consecutive_weeks": 3, # 至少连续 N 周下滑
"peak_ratio_threshold": 0.70, # 销量降至峰值的 X% 以下
"slope_p_value": 0.10, # 斜率显著性阈值
"min_weeks_data": 5, # 最少需要多少周数据才判定
}
# ========== 清仓策略参数 ==========
CLEARANCE_PARAMS = {
"holding_cost_per_unit_week": 8.0, # 每件每周持有成本(仓储+资金)
"risk_discount_rate": 0.15, # 滞销风险折价率
"clearance_execution_cost": 500, # 清仓执行成本(直播/推广)
"discount_tiers": { # 折扣阶梯
0.70: "衰退早期(温和刺激)",
0.50: "衰退中期(主动去库存)",
0.30: "衰退晚期(清仓价)",
0.15: "滞销期(回收现金流)",
},
}
# ========== 可视化配色 ==========
COLORS = {
"introduction": "#4CAF50", # 绿 - 引入期
"growth": "#2196F3", # 蓝 - 成长期
"maturity": "#FF9800", # 橙 - 成熟期
"decline": "#F44336", # 红 - 衰退期
"obsolescence": "#9C27B0", # 紫 - 滞销期
"predicted": "#BDBDBD", # 灰 - 预测区间
"clearance": "#FF5722", # 深橙 - 清仓点
"peak": "#E91E63", # 粉 - 峰值
}
"data_models.py"
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
data_models.py
数据模型层:单品定义、周销量数据、生命周期事件记录
"""
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple
import numpy as np
from datetime import date, timedelta
@dataclass
class Product:
"""
单品定义
"""
product_id: str # 商品编号,如 "DR-2024-001"
name: str # 商品名称
category: str # 品类(dress/down_jacket/tshirt/jeans/activewear)
price: float # 售价(元)
gross_margin: float # 毛利率(0~1)
launch_date: date # 上架日期
initial_stock: int # 初始库存(件)
variable_cost_per_unit: float = 0.0 # 单件可变成本(包装/物流等)
def __post_init__(self):
if self.category not in CATEGORY_PARAMS:
raise ValueError(f"未注册的品类: {self.category}")
if self.price <= 0:
raise ValueError("售价必须大于零")
if not (0 < self.gross_margin <= 1):
raise ValueError("毛利率必须在 0~1 之间")
def marginal_contribution(self) -> float:
"""每件边际贡献 = 售价 × 毛利率 − 单件可变成本"""
return self.price * self.gross_margin - self.variable_cost_per_unit
@dataclass
class WeeklySales:
"""
周销量数据(支持逐步录入,模拟真实追踪场景)
"""
product_id: str
sales: Dict[int, int] = field(default_factory=dict) # {周序号: 销量}
weeks_recorded: int = 0
def add_week(self, week_num: int, units_sold: int) -> None:
"""录入一周销量"""
if units_sold < 0:
raise ValueError("销量不能为负")
self.sales[week_num] = units_sold
self.weeks_recorded = max(self.weeks_recorded, week_num + 1)
def get_array(self, max_weeks: Optional[int] = None) -> np.ndarray:
"""转为连续数组,缺失周填 0"""
if max_weeks is None:
max_weeks = self.weeks_recorded
arr = np.zeros(max_weeks)
for w, v in self.sales.items():
if w < max_weeks:
arr[w] = v
return arr
def get_peak(self) -> Tuple[int, int]:
"""返回 (峰值周序号, 峰值销量)"""
if not self.sales:
return (-1, 0)
peak_w = max(self.sales, key=self.sales.get)
return (peak_w, self.sales[peak_w])
def week_over_week(self) -> np.ndarray:
"""计算周环比增长率"""
arr = self.get_array()
if len(arr) < 2:
return np.array([])
return (arr[1:] - arr[:-1]) / np.where(arr[:-1] > 0, arr[:-1], 1)
@dataclass
class LifecycleEvent:
"""
生命周期事件记录
"""
event_type: str # introduction/growth/maturity/decline/obsolescence/clearance
week: int # 发生周次
description: str = ""
metadata: Dict = field(default_factory=dict)
"curve_fitter.py"
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
curve_fitter.py
曲线拟合器:用 Bass 扩散模型 + 指数衰减拟合/预测销量曲线
"""
import numpy as np
from scipy.optimize import curve_fit
from typing import Dict, Tuple, Optional
from config import CATEGORY_PARAMS
from data_models import Product, WeeklySales
class BassCurveFitter:
"""
Bass 扩散模型拟合器
核心公式:
S(t) = (p + q × F(t)) × (1 − F(t))
F(t) = 1 − e^(−(p+q)×t) / (1 + (q/p) × e^(−(p+q)×t))
叠加季节衰减:
S_final(t) = S(t) × e^(−λ×t)
"""
def __init__(self, product: Product, weekly_sales: WeeklySales):
self.product = product
self.weekly_sales = weekly_sales
self.category_params = CATEGORY_PARAMS[product.category]
# 拟合结果缓存
self.fitted_params: Optional[Tuple[float, float, float]] = None
self.fitted_curve: Optional[np.ndarray] = None
@staticmethod
def bass_function(t: np.ndarray, p: float, q: float, m: float) -> np.ndarray:
"""
Bass 模型核心函数
t: 时间数组
p: 创新系数
q: 模仿系数
m: 市场总潜力(最大累计采纳数)
"""
# 累计采纳比例 F(t)
if p + q == 0:
f_t = np.zeros_like(t, dtype=float)
else:
exp_term = np.exp(-(p + q) * t)
f_t = (1 - exp_term) / (1 + (q / p) * exp_term) if p > 0 else np.zeros_like(t)
# 当期采纳数(即销量)
s_t = m * (p + q * f_t) * (1 - f_t)
return s_t
def seasonal_decay(self, t: np.ndarray) -> np.ndarray:
"""季节衰减因子"""
lam = self.category_params["decay_lambda"]
strength = self.category_params["seasonal_strength"]
# 基础衰减
decay = np.exp(-lam * t)
# 叠加周期性波动(模拟时尚周期的"长尾")
seasonal = 1 + 0.15 * strength * np.sin(2 * np.pi * t / 52)
return decay * seasonal
def full_model(self, t: np.ndarray, p: float, q: float, m: float) -> np.ndarray:
"""完整模型 = Bass + 季节衰减"""
bass = self.bass_function(t, p, q, m)
decay = self.seasonal_decay(t)
return bass * decay
def fit(self, max_weeks: int = 30) -> Dict:
"""
用已有数据拟合模型参数
Returns:
拟合结果字典,包含参数、拟合曲线、预测曲线
"""
observed = self.weekly_sales.get_array(max_weeks)
t = np.arange(len(observed))
if len(observed) < 4:
raise ValueError("数据不足 4 周,无法拟合")
# 初始参数猜测
p0 = self.category_params["bass_p"]
q0 = self.category_params["bass_q"]
m0 = observed.max() * 3 # 市场潜力约为峰值的 3 倍
try:
popt, pcov = curve_fit(
self.full_model,
t,
observed,
p0=[p0, q0, m0],
bounds=([0, 0, 0], [1, 1, 1e6]),
maxfev=5000,
)
self.fitted_params = tuple(popt)
# 生成拟合曲线(已有数据部分)
self.fitted_curve = self.full_model(t, *popt)
# 预测未来(再预测 max_weeks 周)
t_future = np.arange(max_weeks)
future_curve = self.full_model(t_future, *popt)
return {
"status": "success",
"params": {
"p (创新系数)": round(popt[0], 4),
"q (模仿系数)": round(popt[1], 4),
"m (市场潜力)": round(popt[2], 1),
},
"fitted_weeks": len(observed),
"fitted_curve": self.fitted_curve,
"predicted_curve": future_curve,
"predicted_weeks": max_weeks,
"rss": float(np.sum((observed - self.fitted_curve) ** 2)),
}
except Exception as e:
return {
"status": "failed",
"error": str(e),
"params": {"p": p0, "q": q0, "m": m0},
}
def predict_future(self, total_weeks: int = 20) -> np.ndarray:
"""预测未来销量"""
if self.fitted_params is None:
raise ValueError("请先调用 fit() 拟合模型")
t = np.arange(total_weeks)
return self.full_model(t, *self.fitted_params)
def identify_stage(self, week: int) -> str:
"""
根据拟合曲线判断某周所处的生命周期阶段
"""
if self.fitted_params is None:
return "unknown"
t = np.arange(max(week + 1, 10))
curve = self.full_model(t, *self.fitted_params)
if week >= len(curve):
return "obsolescence"
# 找到峰值位置
peak_idx = int(np.argmax(curve))
peak_val = curve[peak_idx]
current_val = curve[week]
peak_ratio = current_val / peak_val if peak_val > 0 else 0
if week < peak_idx * 0.5:
return "introduction"
elif week < peak_idx:
return "growth"
elif peak_ratio > 0.70:
return "maturity"
elif peak_ratio > 0.35:
return "decline"
else:
return "obsolescence"
"decline_detector.py"
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
decline_detector.py
衰退检测器:三重判定算法识别款式进入衰退期的精确时间点
"""
import numpy as np
from typing import Dict, List, Tuple
from scipy import stats
from config import DECLINE_THRESHOLDS
from data_models import WeeklySales
class DeclineDetector:
"""
衰退检测算法
三重判定:
① 连续 N 周销量环比下滑
② 销量降至峰值 X% 以下
③ 线性回归斜率显著为负
"""
def __init__(self, weekly_sales: WeeklySales):
self.sales = weekly_sales
self.thresholds = DECLINE_THRESHOLDS
def check_consecutive_decline(self, window: int = 3) -> Dict:
"""
判定①:检查是否存在连续 N 周下滑
Returns:
{detected: bool, start_week: int, decline_weeks: int}
"""
arr = self.sales.get_array()
if len(arr) < window:
return {"detected": False, "reason": "数据不足"}
# 滑动窗口检测
max_consecutive = 0
current_consecutive = 0
decline_start = -1
for i in range(1, len(arr)):
if arr[i] < arr[i - 1]:
current_consecutive += 1
if current_consecutive == 1:
decline_start = i - 1
max_consecutive = max(max_consecutive, current_consecutive)
else:
current_consecutive = 0
min_required = self.thresholds["min_consecutive_weeks"]
detected = max_consecutive >= min_required
return {
"detected": detected,
"max_consecutive_decline": max_consecutive,
"decline_start_week": decline_start if detected else -1,
"reason": f"最长连续下滑 {max_consecutive} 周" if detected else "未检测到连续下滑",
}
def check_peak_ratio(self) -> Dict:
"""
判定②:当前销量是否降至峰值的 X% 以下
Returns:
{detected: bool, current_ratio: float, peak_week: int, peak_value: int}
"""
peak_week, peak_val = self.sales.get_peak()
if peak_week < 0:
return {"detected": False, "reason": "无销量数据"}
arr = self.sales.get_array()
threshold = self.thresholds["peak_ratio_threshold"]
# 检查最近几周是否低于阈值
recent_weeks = 3
detected = False
for i in range(max(0, len(arr) - recent_weeks), len(arr)):
ratio = arr[i] / peak_val if peak_val > 0 else 0
if ratio < threshold:
detected = True
break
current_ratio = arr[-1] / peak_val if peak_val > 0 and len(arr) > 0 else 0
return {
"detected": detected,
"current_ratio": round(current_ratio, 3),
"peak_week": peak_week,
"peak_value": peak_val,
"threshold": threshold,
"reason": f"当前销量是峰值的 {current_ratio*100:.1f}%" if detected else "仍在峰值以上",
}
def check_slope_significance(self, window: int = 5) -> Dict:
"""
判定③:线性回归斜率是否显著为负
Returns:
{detected: bool, slope: float, p_value: float}
"""
arr = self.sales.get_array()
if len(arr) < window:
return {"detected": False, "reason": "数据不足"}
# 取最近 window 周做线性回归
recent = arr[-window:]
t = np.arange(window)
slope, intercept, r_value, p_value, std_err = stats.linregress(t, recent)
alpha = self.thresholds["slope_p_value"]
detected = slope < 0 and p_value < alpha
return {
"detected": detected,
"slope": round(slope, 2),
"r_squared": round(r_value ** 2, 3),
"p_value": round(p_value, 4),
"reason": f"斜率={slope:.1f}, p={p_value:.3f}" if detected else "斜率不显著",
}
def detect_decline(self) -> Dict:
"""
三重判定汇总
Returns:
综合判定结果,包含各子判定详情和最终结论
"""
if self.sales.weeks_recorded < self.thresholds["min_weeks_data"]:
return {
"status": "insufficient_data",
"weeks_recorded": self.sales.weeks_recorded,
"min_required": self.thresholds["min_weeks_data"],
"decline_detected": False,
}
result_1 = self.check_consecutive_decline()
result_2 = self.check_peak_ratio()
result_3 = self.check_slope_significance()
# 三重判定汇总
count = sum([
result_1["detected"],
result_2["detected"],
result_3["detected"],
])
# 至少满足 2/3 才判定为衰退
decline_detected = count >= 2
# 衰退起点 = 最严格的判定(连续下滑起点)
decline_week = -1
if decline_detected:
candidates = []
if result_1["detected"]:
candidates.append(result_1["decline_start_week"])
if result_2["detected"]:
candidates.append(result_2.get("peak_week", -1))
if result_3["detected"]:
candidates.append(self.sales.weeks_recorded - 1)
decline_week = min(c for c in candidates if c >= 0)
return {
"status": "analyzed",
"decline_detected": decline_detected,
"decline_week": decline_week,
"confidence": f"{count}/3 条件满足",
"checks": {
"连续下滑检测": result_1,
"峰值比例检测": result_2,
"斜率显著性检测": result_3,
},
"recommendation": self._get_recommendation(
decline_detected, count, decline_week
),
}
def _get_recommendation(
self, detected: bool, count: int, week: int
) -> str:
"""根据判定结果给出建议"""
if not detected:
if count == 0:
return "款式仍处于健康销售期,维持正常运营"
elif count == 1:
return "出现早期衰退信号,建议密切关注后续 1~2 周数据"
else:
return "数据矛盾,建议人工复核"
else:
return f"⚠️ 衰退已确认(第 {week} 周起),建议启动折扣/清仓评估"
class LifecycleTracker:
"""
生命周期阶段追踪器
基于销量曲线的形态特征,自动标注每个阶段
"""
@staticmethod
def identify_stages(sales_array: np.ndarray) -> List[Tuple[int, str]]:
"""
基于销量曲线的二阶导数识别阶段转换点
Returns:
[(week, stage_name), ...] 按时间排序的阶段列表
"""
if len(sales_array) < 4:
return [(0, "introduction")]
stages = []
arr = sales_array.copy()
# 一阶导数(变化率)
diff1 = np.diff(arr)
# 二阶导数(加速度)
diff2 = np.diff(diff1)
# 找峰值
peak_idx = int(np.argmax(arr))
# 找拐点(二阶导数过零)
inflection_points = []
for i in range(1, len(diff2)):
if diff2[i] * diff2[i - 1] < 0:
inflection_points.append(i + 1)
# 阶段划分逻辑
# 引入期:0 ~ 第一个拐点(或峰值的 1/3)
intro_end = inflection_points[0] if inflection_points else peak_idx // 3
stages.append((0, "introduction"))
# 成长期:拐点 ~ 峰值
if intro_end < peak_idx:
stages.append((intro_end, "growth"))
# 成熟期:峰值 ~ 峰值后第一个拐点
decline_start = peak_idx + 1
for ip in inflection_points:
if ip > peak_idx:
decline_start = ip
break
stages.append((peak_idx, "maturity"))
# 衰退期:拐点 ~ 低谷
if decline_start < len(arr) - 1:
stages.append((decline_start, "decline"))
利用AI解决实际问题,如果你觉得这个工具好用,欢迎关注长安牧笛!
