[特殊字符] 从零搭一个淘宝商品价格监控系统:TOP API + 定时任务 + 微信推送(附Python源码)
📉 从零搭一个淘宝商品价格监控系统:TOP API + 定时任务 + 微信推送(附Python源码)
用淘宝官方taobao.item.get定时拉商品价格 → 降价/涨价超阈值 →推企业微信/飞书/钉钉 Webhook。全程合法、无爬虫、可7×24运行。
一、监控架构
┌──────────────┐ 每N分钟 ┌────────────────────┐ │ APScheduler │ ──────────────▶ │ TOP API │ │ 定时任务 │ │ taobao.item.get │ └──────┬───────┘ └─────────┬──────────┘ │ 价格变化 Δ > threshold │ ▼ │ ┌──────────────────┐ Webhook │ │ 企业微信/钉钉Bot │ ◀────────────────────┘ └──────────────────┘ (Markdown消息)二、完整可运行代码
# taobao_price_monitor.py """ 淘宝商品价格监控系统 - 跟踪关注商品(num_iid)价格 - 变化幅度超 threshold 推企业微信机器人 - SQLite 本地存快照(首次建表自动) 依赖: requests (pip install requests) """ import hashlib import time import requests import sqlite3 import logging from typing import Dict, Optional from datetime import datetime # 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex # ── 配置区 ────────────────────────────────────── APP_KEY = "YOUR_TOP_APP_KEY" APP_SECRET = "YOUR_TOP_APP_SECRET" SANDBOX = False # 生产=False CHECK_INTERVAL_SEC = 1800 # 30分钟 PRICE_CHANGE_THRESHOLD = 0.05 # 涨跌超5%触发告警 WECOM_BOT_WEBHOOK = ( # 企业微信群机器人Webhook "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=YOUR_KEY" ) WATCH_LIST = [ # 监控商品列表 {"num_iid": "654321098765", "alias": "304不锈钢保温杯 500ml"}, # {"num_iid": "612345678901", "alias": "其它监控商品"}, ] # ──────────────────────────────────────────────── # ============ TOP Client (内联最小化版) ============ class TopClient: GW_PROD = "https://gw.api.taobao.com/router/rest" GW_SBOX = "https://gw.api.tbsandbox.com/router/rest" def __init__(self, ak, ask, sandbox=False): self.ak, self.ask = ak, ask self.gw = self.GW_SBOX if sandbox else self.GW_PROD def _sign(self, p: Dict) -> str: filt = sorted((k, v) for k, v in p.items() if v is not None and str(v).strip() != '' and k != 'sign') qs = ''.join(f"{k}{v}" for k, v in filt) return hashlib.md5(f"{self.ask}{qs}{self.ask}".encode()).hexdigest().upper() def call(self, method: str, biz: Dict, session=None) -> Dict: p = {"method": method, "app_key": self.ak, "timestamp": str(int(time.time() * 1000)), "format": "json", "v": "2.0", "sign_method": "md5"} if session: p["session"] = session p.update(biz) p["sign"] = self._sign(p) r = requests.post(self.gw, data=p, timeout=15) r.raise_for_status() d = r.json() if "error_response" in d: err = d["error_response"] raise Exception(f"TOP[{err.get('code')}]: {err.get('msg')} {err.get('sub_msg','')}") return d.get(list(d.keys() - {"error_response"})[0], {}) def get_price(self, num_iid: str) -> float: resp = self.call("taobao.item.get", { "num_iid": num_iid, "fields": "num_iid,title,price,approve_status" }) item = resp.get("item", {}) if item.get("approve_status") != "onsale": raise ValueError(f"商品{num_iid} 已下架/仓库中") return float(item.get("price") or 0) # =================================================== # ============ SQLite 价格快照 ==================== def init_db(db_path="price_monitor.db"): conn = sqlite3.connect(db_path) conn.execute(""" CREATE TABLE IF NOT EXISTS price_snapshot( num_iid TEXT PRIMARY KEY, last_price REAL, updated_at TEXT ) """) conn.commit() return conn def load_last_price(conn, num_iid: str) -> Optional[float]: cur = conn.execute("SELECT last_price FROM price_snapshot WHERE num_iid=?", (num_iid,)) row = cur.fetchone() return row[0] if row else None def save_price(conn, num_iid: str, price: float): conn.execute(""" INSERT INTO price_snapshot(num_iid,last_price,updated_at) VALUES(?,?,?) ON CONFLICT(num_iid) DO UPDATE SET last_price=excluded.last_price, updated_at=excluded.updated_at """, (num_iid, price, datetime.now().strftime("%Y-%m-%d %H:%M:%S"))) conn.commit() # =================================================== # ============ 企业微信推送 ======================= def push_wecom(title: str, old: float, new: float, alias: str, num_iid: str): if not WECOM_BOT_WEBHOOK: return direction = "📉 降价" if new < old else "📈 涨价" pct = abs(new - old) / old * 100 if old else 0 content = ( f"## 🛒 淘宝商品价格变动通知\n" f"**商品**:{alias}\n" f"**ID**:`{num_iid}`\n" f"**{direction}**:¥{old:.2f} → **¥{new:.2f}** (±{pct:.1f}%)\n" f"> 监控时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" ) try: requests.post(WECOM_BOT_WEBHOOK, json={ "msgtype": "markdown", "markdown": {"content": content} }, timeout=10) except Exception as e: logging.warning(f"微信推送失败: {e}") # ============ 核心监控逻辑 ======================= def monitor_once(client: TopClient, conn): for w in WATCH_LIST: num_iid = str(w["num_iid"]) alias = w.get("alias") or num_iid try: cur_price = client.get_price(num_iid) last_price = load_last_price(conn, num_iid) if last_price is None: # 首次记录,不告警 save_price(conn, num_iid, cur_price) print(f"[INIT] {alias} 初始价格 ¥{cur_price:.2f}") continue diff_ratio = abs(cur_price - last_price) / last_price if last_price else 0 if diff_ratio >= PRICE_CHANGE_THRESHOLD: print(f"[ALERT] {alias} ¥{last_price:.2f}→¥{cur_price:.2f} " f"(Δ{diff_ratio*100:.1f}%)") push_wecom(alias, last_price, cur_price, alias, num_iid) else: print(f"[OK] {alias} 价格稳定 ¥{cur_price:.2f}") save_price(conn, num_iid, cur_price) except ValueError as ve: print(f"[WARN] {alias}: {ve}") except Exception as e: print(f"[ERR] {alias}: {e}") # ============ 入口 =============================== if __name__ == "__main__": logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") client = TopClient(APP_KEY, APP_SECRET, sandbox=SANDBOX) conn = init_db() print(f"▶ 淘宝价格监控启动,间隔={CHECK_INTERVAL_SEC}s,阈值={PRICE_CHANGE_THRESHOLD*100}%") monitor_once(client, conn) # 立即执行一次 # ---- 常驻定时(取消注释启用)---- # import apscheduler # from apscheduler.schedulers.blocking import BlockingScheduler # sched = BlockingScheduler() # sched.add_job(lambda: monitor_once(client, conn), # 'interval', seconds=CHECK_INTERVAL_SEC, id='price_monitor') # try: # sched.start() # except (KeyboardInterrupt, SystemExit): # sched.shutdown()三、配置步骤
安装依赖
pip install requests apscheduler # apscheduler 仅常驻时使用填写配置
APP_KEY/APP_SECRET:淘宝开放平台应用WECOM_BOT_WEBHOOK:企业微信群 → 添加机器人 → 复制 Webhook URLWATCH_LIST:填入要监控的num_iid(从商品URLid=提取)
运行
python taobao_price_monitor.py首次运行记录基准价不发告警,后续价格变动 ±5%(可改PRICE_CHANGE_THRESHOLD)推企微。
常驻(取消代码尾部注释)
nohup python taobao_price_monitor.py > monitor.log 2>&1 &四、避坑
坑 | 现象 | 解决 |
|---|---|---|
取不到 price | 商品下架 | 代码已捕获跳过 |
Invalid Signature | 时间戳秒级 | 确保 |
企�推送无反应 | Webhook key错 / IP受限 | 企业微信后台查机器人日志 |
QPS触发限流 | 监控商品过多 | 令牌桶限速 或 加大 |
num_iid 错 | 复制完整URL未提取ID | 只保留数字部分 |
五、面试/方案一句话
淘宝价格监控 =定时任务调
taobao.item.get(fields=price)取一口价 → SQLite存上次快照 → 差价超阈值调企业微信Markdown Webhook推送;全程用官方TOP API合法获取,不爬页面。
需要我补钉钉/飞书 Webhook 消息格式 或多SKU规格价监控(查 skus[].price) 吗?
