当前位置: 首页 > news >正文

影刀RPA实战:从零搭建电商数据采集系统

影刀RPA实战:从零搭建电商数据采集系统

作者:林焱|阅读时间:约12分钟|难度:⭐⭐⭐ 实战

这是本文集的收官之作——一个完整的端到端项目实战。我们将从零开始,搭建一套生产级的电商数据采集系统,涵盖采集、清洗、入库、分析、报表、通知全流程。


一、项目背景与需求

1.1 项目背景

某电商运营团队每天需要监控多个平台的竞品价格和库存信息:

  • 淘宝:约500个SKU
  • 京东:约300个SKU
  • 拼多多:约400个SKU

目前由3名运营人员手工操作,每人每天花2小时复制粘贴数据到Excel。

目标:用 RPA 实现全自动采集,释放人力去做更有价值的工作。

1.2 需求清单

需求项说明
多平台采集支持淘宝/京东/拼多多三大平台
增量采集只采集有变化的商品(价格/库存变动)
数据持久化存入 SQLite 数据库,支持历史趋势查询
异常保护单个商品失败不影响整体流程
变动告警价格变动超过阈值时发送钉钉通知
自动报表每天8:30自动生成Excel报表并发邮件
运行日志记录每次运行的详细日志

1.3 技术架构图

┌─────────────────────────────────────────────────────┐ │ 电商数据采集系统架构 │ ├─────────────────────────────────────────────────────┤ │ │ │ ┌──────────┐ │ │ │ 定时调度 │ ← 每天7:00触发 │ │ └────┬─────┘ │ │ ↓ │ │ ┌──────────────────────────────────────┐ │ │ │ 主控流程 orchestrator │ │ │ │ │ │ │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │ │ │淘宝采集 │ │京东采集 │ │拼多多采 │ │ │ │ │ │ taobao │ │ jd │ │ pdd │ │ │ │ │ └────┬────┘ └────┬────┘ └────┬────┘ │ │ │ │ └─────────────┼─────────────┘ │ │ │ │ ↓ │ │ │ │ ┌─────────────────┐ │ │ [video(video-94eiYjyp-1781858908008)(type-csdn)(url-https://live.csdn.net/v/embed/525000)(image-https://v-blog.csdnimg.cn/asset/23da3fe1f67a47106d725406cfde9a97/cover/Cover0.jpg)(title-拼多多店群自动化上架方案)] │ │ │ 数据清洗去重 │ │ │ │ │ └────────┬────────┘ │ │ │ │ ↓ │ │ │ │ ┌──────────────┼──────────────┐ │ │ │ │ ↓ ↓ ↓ │ │ │ │ ┌──────┐ ┌──────────┐ ┌─────┐ │ │ │ │ │SQLite│ │ 变动检测 │ │报表 │ │ │ │ │ │ 入库 │ │ +告警通知│ │生成 │ │ │ │ │ └──────┘ └────┬─────┘ └──┬──┘ │ │ │ │ ↓ ↓ │ │ │ │ ┌──────────┐ ┌─────┐ │ │ │ │ │ 钉钉通知 │ │邮件 │ │ │ │ │ └──────────┘ └─────┘ │ │ │ └───────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────┘

二、数据库设计

2.1 表结构设计

-- ================================-- 表1:商品主信息表-- ================================CREATETABLEIFNOTEXISTSproducts(idINTEGERPRIMARYKEYAUTOINCREMENT,sku_idTEXTNOTNULLUNIQUE,-- 商品唯一标识nameTEXT,-- 商品名称platformTEXTNOTNULL,-- 平台 (taobao/jd/pdd)categoryTEXT,-- 分类shop_nameTEXT,-- 店铺名称product_urlTEXT,-- 商品链接image_urlTEXT,-- 图片链接is_activeINTEGERDEFAULT1,-- 是否在售(0=下架)created_atDATETIMEDEFAULTCURRENT_TIMESTAMP,updated_atDATETIMEDEFAULTCURRENT_TIMESTAMP);-- ================================-- 表2:价格历史记录表(核心表)-- ================================CREATETABLEIFNOTEXISTSprice_history(idINTEGERPRIMARYKEYAUTOINCREMENT,sku_idTEXTNOTNULL,platformTEXTNOTNULL,priceREAL,-- 当前价格original_priceREAL,-- 原价(如有折扣)stock_statusTEXTDEFAULT'unknown',-- 库存状态(in_stock/out_of_stock)sales_countINTEGERDEFAULT0,-- 销量/评价数collected_atDATETIMEDEFAULTCURRENT_TIMESTAMP,FOREIGNKEY(sku_id)REFERENCESproducts(sku_id),UNIQUE(sku_id,DATE(collected_at))-- 同一商品每天只保留一条);-- 创建索引加速查询CREATEINDEXidx_price_skuONprice_history(sku_id);CREATEINDEXidx_price_dateONprice_history(collected_at);CREATEINDEXidx_price_platformONprice_history(platform);CREATEINDEXidx_products_platformONproducts(platform);CREATEINDEXidx_products_categoryONproducts(category);-- ================================-- 表3:变动告警记录表-- ================================CREATETABLEIFNOTEXISTSalerts(idINTEGERPRIMARYKEYAUTOINCREMENT,sku_idTEXT,alert_typeTEXT,-- 'price_up'/'price_down'/'out_of_stock'/'back_in_stock'old_valueREAL,new_valueREAL,change_pctREAL,-- 变化百分比thresholdREAL,-- 触发的阈值notifiedINTEGERDEFAULT0,-- 是否已通知(0=未通知/1=已通知)notified_atDATETIME,created_atDATETIMEDEFAULTCURRENT_TIMESTAMP);-- ================================-- 表4:运行日志表-- ================================CREATETABLEIFNOTEXISTSrun_logs(idINTEGERPRIMARYKEYAUTOINCREMENT,run_dateDATE,platformTEXT,statusTEXT,-- 'running'/'success'/'failed'/'partial'total_itemsINTEGERDEFAULT0,success_itemsINTEGERDEFAULT0,failed_itemsINTEGERDEFAULT0,new_itemsINTEGERDEFAULT0,-- 新发现的商品数changed_itemsINTEGERDEFAULT0,-- 有变动的商品数duration_secondsREAL,error_msgTEXT,created_atDATETIMEDEFAULTCURRENT_TIMESTAMP);

2.2 设计说明

为什么要这样设计? 1. products 和 price_history 分开: → 商品基本信息不常变(名称/分类/链接) → 价格数据每天都在变化 → 分开后查询更快,存储更高效 2. UNIQUE(sku_id, DATE) 约束: → 同一商品同一天只保留一条最新价格 ![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/55af049b85c648e9a0cf7c3710140d74.png#pic_center) ![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/b8b79497c7e44ee7a90608f7e0cd274b.png#pic_center) → 天然实现了增量更新(重复插入会冲突) 3. alerts 表独立出来: → 告警逻辑和数据采集解耦 → 可以单独查看所有历史告警 → 支持告警的"已读/未读"管理

三、核心模块实现

模块1:多平台采集器

=========================================== 子流程:platform_collector 输入参数:platform (平台名),sku_list (商品列表) 输出参数:raw_data (原始采集数据),stats (统计信息) =========================================== 【初始化】 raw_data =[]stats ={"total":len(sku_list),"success":0,"failed":0}【根据平台选择不同的采集策略】if platform == "taobao":# 淘宝采集策略打开("https://www.taobao.com") 登录淘宝账号()For Each sku in sku_list:Try:# 方式A:通过商品链接直接打开详情页打开(sku.url) 等待页面加载(5秒)# 提取信息current_price = 获取元素文本(".Price--priceText") original_price = 获取元素文本(".Price--original") stock_info = 获取元素文本(".StockStatus") record ={"sku_id":sku.id,"name":sku.name,"platform":"taobao","price":提取数字(current_price),"original_price":提取数字(original_price),"stock":"有货" if "有货" in stock_info else "无货","collected_time":当前时间()}raw_data.append(record) stats["success"]+= 1Catch:stats["failed"]+= 1log(f"淘宝采集失败:{sku.name}")# 反爬策略:随机延迟等待(随机2~6秒)elif platform == "jd":# 京东采集策略(类似,适配京东页面结构)打开("https://www.jd.com")...For Each sku in sku_list:Try:# 京东可以用搜索+列表方式批量获取# 或者逐个打开详情页...Catch:stats["failed"]+= 1elif platform == "pdd":# 拼多多采集策略(注意反爬更严格)打开("https://www.pinduoduo.com")...# 拼多多可能需要更长的延迟(5~10秒)# 以及更多的Cookie维护【返回结果】 return raw_data,stats

模块2:数据清洗与入库

=========================================== 子流程:data_pipeline 输入参数:raw_data,platform 输出参数:new_count,changed_count =========================================== new_count = 0 changed_count = 0 alert_candidates =[]# 可能需要告警的商品BEGIN TRANSACTIONFor Each item in raw_data:【Step 1:数据清洗】# 价格校验if item.price <= 0 or item.price > 999999:item.price = None# 无效价格标记为空log(f"异常价格:{item.sku_id}={item.price}")# 名称清洗item.name = 去除首尾空格(item.name)if len(item.name) > 200:item.name = item.name[:200]【Step 2:判断是新增还是已有商品】 existing = SELECT * FROM products WHERE sku_id = item.sku_idif existing 为空:# ===== 新商品 =====INSERT INTO products (sku_id,name,platform,shop_name,product_url) VALUES (item.sku_id,item.name,item.platform,...,...) new_count += 1else:# ===== 已有商品:检查是否有变化 =====last_price = SELECT price FROM price_history WHERE sku_id = item.sku_id ORDER BY collected_at DESC LIMIT 1if last_price 不为空:if last_price != item.price and item.price 不为 None:# 价格变了!计算变化幅度change_pct = (item.price-last_price) / last_price * 100 changed_count += 1if abs(change_pct) >= 10:# 变化超过10%alert_candidates.append({"sku_id":item.sku_id,"type":"price_up" if change_pct>0 else "price_down","old":last_price,"new":item.price,"pct":change_pct})【Step 3:写入价格历史】# 使用 INSERT OR REPLACE 处理同日重复INSERT OR REPLACE INTO price_history (sku_id,platform,price,original_price,stock_status,sales_count) VALUES (item.sku_id,item.platform,item.price,...) COMMIT TRANSACTION# 将告警候选写入alerts表For Each alert in alert_candidates:INSERT INTO alerts (sku_id,alert_type,old_value,new_value,change_pct,threshold) VALUES (alert.sku_id,alert.type,alert.old,alert.new,alert.pct,10.0) return new_count,changed_count

模块3:变动告警

=========================================== 子流程:send_alerts ===========================================# 查询今天未发送的告警pending_alerts = SELECT * FROM alerts WHERE notified = 0 AND DATE(created_at) = DATE('now') ORDER BY ABS(change_pct) DESCif pending_alerts 为空:return# 没有需要告警的# 组装告警消息message_parts =["🔔 **竞品价格变动告警**\n"]message_parts.append(f"⏰ 检测时间:{当前时间}\n") message_parts.append("---\n")For Each alert in pending_alerts:# 查询商品名称product = SELECT name FROM products WHERE sku_id = alert.sku_idif alert.alert_type == "price_up":emoji = "📈" direction = "↑上涨"elif alert.alert_type == "price_down":emoji = "📉" direction = "↓下降"else:emoji = "⚠️" direction = "变动" line = f"{emoji}**{product.name}**\n价格{direction}:¥{alert.old_value}→ ¥{alert.new_value}({alert.change_pct:+.1f}%)\n\n" message_parts.append(line)# 标记为已通知UPDATE alerts SET notified=1,notified_at=CURRENT_TIMESTAMP WHERE id=alert.id# 发送钉钉消息if len(pending_alerts) <= 10:# 少量告警:直接发完整内容dingtalk_send(webhook_url,"\n".join(message_parts))else:# 大量告警:只发摘要summary = f"共{len(pending_alerts)}条价格变动\nTOP5最大变动:\n..." dingtalk_send(webhook_url,summary)

模块4:日报生成

=========================================== 子流程:daily_report_generator =========================================== 【数据准备】# 今日各平台统计today_stats = """ SELECT platform,COUNT(*)as total,SUM(CASE WHEN price IS NOT NULL THEN 1 ELSE 0 END) as has_price,AVG(price) as avg_price,MIN(price) as min_price,MAX(price) as max_price FROM price_history WHERE DATE(collected_at) = DATE('now') GROUP BY platform """# 价格变动汇总(今日 vs 昨日)change_summary = """ SELECT today.platform,today.avg_price as today_avg,yesterday.avg_price as yesterday_avg,CASE WHEN yesterday.avg_price>0 THEN ((today.avg_price-yesterday.avg_price) / yesterday.avg_avg * 100) ELSE NULL END as pct_change FROM (SELECT platform,AVG(price) as avg_price FROM price_history WHERE DATE(collected_at)=DATE('now') GROUP BY platform) today LEFT JOIN (SELECT platform,AVG(price) as avg_price FROM price_history WHERE DATE(collected_at)=DATE('now','-1 day') GROUP BY platform) yesterday ON today.platform = yesterday.platform """# TOP20 最低价商品(可能有促销机会)top_bargains = SELECT p.name,ph.price,p.shop_name,p.product_url FROM price_history ph JOIN products p ON ph.sku_id = p.sku_id WHERE DATE(ph.collected_at) = DATE('now') AND ph.price IS NOT NULL ORDER BY ph.price ASC LIMIT 20 【创建 Excel 报表】 新建Excel "D:/reports/电商竞品监控日报_{日期}.xlsx"Sheet1 "今日概览":┌─────────────────────────────────────────────┐ │ 📊 电商竞品监控日报 │ │ 日期:2026年6月10日 星期三 │ │ 生成时间:08:30 │ ├─────────────────────────────────────────────┤ │ │ │ 【各平台数据概览】 │ │ 平台|采集量|有价格|平均价|最高|最低│ │ 淘宝|485|472|¥189|¥2999|¥9.9│ │ 京东|298|295|¥256|¥5999|¥19 │ │ 拼多多|392|388|¥67|¥1599|¥5.5 │ │ 合计|**1175**|**1155**│---|---|---│ ├─────────────────────────────────────────────┤ │ 📈 价格变动:涨 23 个 / 跌 15 个 / 新品 8 个 │ │ ⚠️ 库存预警:3个商品显示缺货 │ └─────────────────────────────────────────────┘Sheet2 "全部数据":→ 导出今日全部价格记录(供深度分析用)Sheet3 "变动明细":→ 列出所有价格变动的商品及变化幅度Sheet4 "最低价TOP20":→ 列出最值得关注的低价/促销商品 + 条件格式: → 涨价的标红色 → 降价的标绿色 → 缺货的标灰色 + 插入图表: → 各平台平均价格对比柱状图 → 近7天价格趋势折线图 保存Excel 【发送邮件】 send_email( to =["boss@co.com","ops@co.com"],subject = "电商竞品监控日报-{今天}",body = "各位好,附件为今日竞品价格监控报告。共发现{changed_count}个价格变动...",attachment = "日报文件路径" )

四、主控流程组装

=========================================== 主流程:ecommerce_monitor_system 调度时间:每天 07:00 开始执行 ========================================== start_time = 当前时间() log("="*60)log(f"开始执行|{start_time}") log("="*60)【Phase 1:采集各平台数据】 all_raw_data ={}all_stats ={}# 依次采集三个平台For Each platform in["taobao","jd","pdd"]:log(f"--- 开始采集:{platform}---")# 获取该平台的SKU列表(从配置文件或数据库读取)sku_list = load_sku_config(platform)# 调用采集子流程raw_data,stats = platform_collector(platform,sku_list) all_raw_data[platform]= raw_data all_stats[platform]= stats log(f"{platform}完成:成功{stats['success']},失败{stats['failed']}")【Phase 2:合并并清洗入库】 total_new = 0 total_changed = 0 For Each platform in["taobao","jd","pdd"]:new_cnt,changed_cnt = data_pipeline(all_raw_data[platform],platform) total_new += new_cnt total_changed += changed_cntlog(f"入库完成:新增{total_new},变动{total_changed}")【Phase 3:发送告警】 send_alerts()【Phase 4:生成并发送日报】 daily_report_generator()【Phase 5:记录运行日志】 end_time = 当前时间() duration = (end_time-start_time).总秒数 total_success = sum(s["success"]for s in all_stats.values()) total_failed = sum(s["failed"]for s in all_stats.values()) INSERT INTO run_logs ( run_date,status,total_items,success_items,failed_items,new_items,changed_items,duration_seconds ) VALUES ( DATE('now'),'success' if total_failed == 0 else 'partial',total_success + total_failed,total_success,total_failed,total_new,total_changed,duration ) log("="*60)log(f"执行完毕|耗时{duration:.1f}|成功{total_success}失败{total_failed}") log("="*60)

五、部署与运维

5.1 配置文件管理

# config.yaml — 所有可配置参数集中管理database:path:"D:/rpa_data/ecommerce.db"platforms:taobao:enabled:trueaccount:"your_taobao_account"delay_range:[3,8]# 秒jd:enabled:trueaccount:"your_jd_account"delay_range:[2,6]pdd:enabled:trueaccount:"your_pdd_account"delay_range:[5,12]# 拼多多反爬严格,延迟更长![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/8ba5a2ccf1c34ef4baffb500e0186c9a.png#pic_center)alerts:price_change_threshold:10# 百分比dingtalk_webhook:"https://oapi.dingtalk.com/robot/send?access_token=xxx"email:smtp:"smtp.qq.com:465"sender:"robot@company.com"password:"授权码"recipients:["boss@company.com","ops@company.com"]report:output_dir:"D:/reports/"retain_days:90# 保留最近90天的报表

5.2 定时任务配置

任务名称:电商监控系统 Cron表达式:0 7 * * * (每天早上7点整) 超时设置:60分钟(3个平台全采完通常需要30~45分钟) 重试策略: 整体失败时:次日正常执行即可(不需要立即重试) 单平台失败:不影响其他平台继续运行 告警规则: 连续2天执行失败 → 电话通知运维 单次成功率 < 80% → 邮件通知负责人 总耗时超过50分钟 → 钉钉群提醒性能退化

5.3 监控面板关键指标

TEMU店群如何管理运营?

╔══════════════════════════════════════╗ ║ 🖥️ 电商监控系统看板 ║ ╠══════════════════════════════════════╣ ║ ║ ║ 📊 今日运行状态 ║ ║ ┌──────────────────────────────┐ ║ ║ │ ████████████████████░░░░ 85% │ ║ ║ │ 成功率 85% (1000/1175) │ ║ ║ └──────────────────────────────┘ ║ ║ ║ ║ 📈 各平台情况 ║ ║ 淘宝: ✅ 485/500 (97%) 耗时12min ║ ║ 京东: ✅ 298/300 (99%) 耗时8min ║ ║ 拼多多: ⚠️ 392/420 (93%) 耗时18min ║ ║ ║ ║ 🔔 待处理告警:5条 ║ ║ ┌─ 3条涨价 / 2条降价 ║ ║ ║ ║ 📅 连续运行天数:28天 ✅ ║ ║ 上次故障:6月5日(网络超时)已恢复 ║ ║ ║ ╚══════════════════════════════════════╝

六、项目总结

6.1 本项目覆盖的全部技能点

技能矩阵: 基础操作: ✅ 打开网页 / 元素捕获 / 点击 / 输入文本 / 获取文本 数据处理: ✅ 循环遍历 / For Each / 列表操作 / 字典操作 ✅ 字符串处理 / 正则提取数字 / 类型转换 数据库: ✅ SQLite 建表 / CRUD / 事务 / 批量插入 ✅ SQL聚合查询 / JOIN / 子查询 异常处理: ✅ Try-Catch / 重试机制 / 日志记录 / 降级方案 办公自动化: ✅ Excel 创建/写入/样式/图表/条件格式 ✅ 邮件发送(HTML)/附件管理 通信通知: ✅ 钉钉Webhook推送 / Markdown格式消息 工程能力: ✅ 子流程模块化 / 参数传递 / 配置文件分离 ✅ 定时调度 / 运行日志 / 监控告警

6.2 项目扩展方向

当前版本可以继续演进的方向: 1. 🤖 AI 增强 用大模型自动分析价格变动原因 生成自然语言的每日简报摘要 2. 📱 可视化大屏 搭建 Web 仪表盘实时展示数据 支持 PC/手机随时查看 ![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/ebab4a43d5d941bfbece790e5b54dfbc.png#pic_center) 3. 🔄 API 开放 提供 REST API 给其他系统调用 对接企业内部 ERP/OA 系统 4. 📈 预测模型 基于历史数据预测未来价格走势 为采购决策提供数据支撑

七、全文系列回顾

恭喜你完成了本篇收官之作!至此,你已经拥有了一套完整的影刀RPA知识体系

从入门到精通的学习路径: Day 1-2: 零基础入门 → 第一个自动化流程 ✅ Day 3-5: 元素捕获 → 子流程 → 异常处理 ✅ Day 6-8: 数据采集 → Excel操作 → 数据库 ✅ Day 9-10: 定时任务 → 邮件自动化 → 调试技巧 ✅ Day 11+: 综合项目实战 → 你在这里 ✨

下一步?把这套系统真正跑起来吧!

从本文的项目中选取适合你的部分开始实施——哪怕只是先做一个「单平台+单功能」的最小版本。RPA 的精髓不在于学了多少理论,而在于动手做出第一个能跑的生产级流程。

💡最后一句忠告:先让流程跑通,再让它跑快,最后让它跑稳。不要一开始就追求完美——完美是在迭代中产生的。

http://www.jsqmd.com/news/1044833/

相关文章:

  • 黄山GEO服务商代理加盟选型哪家靠谱推荐?2026年黄山GEO优化服务商代理加盟排名与合作路径更新 - 小随科技
  • 2026东莞高埗企业法律顾问靠谱律所推荐(5家精选) - GrowthUME
  • 终极指南:如何轻松实现《塞尔达传说:旷野之息》WiiU与Switch存档转换
  • 2026武汉中职择校指南|武汉光谷科技职业技术学校领跑,全国唯一“海陆空”实训基地+17大热门专业对比,避坑指南 - GrowthUME
  • 家装选购开店加盟参考|2026主流软装品牌三维度实力解析榜单 - 速递信息
  • 优质国际EMBA测评:科学选型与差异化对比指南 - 品牌2026推荐
  • 斑斑AI低代码 vs 搭贝:企业低代码平台深度对比分析
  • Tp on
  • 深入解析MCF5206片选模块:嵌入式系统总线访问与多主架构设计
  • Skills实战之 - 首个技能开发(实战演练:用 10 行代码让 AI 学会自定义文件批量重命名)
  • Day48
  • 掌握gInk屏幕标注:免费开源工具的终极使用指南
  • 2026年无锡GEO服务商代理加盟选型指南丨无锡GEO代理服务商靠谱推荐与合作权益深度解析 - 科技快讯
  • BiliTools:终极跨平台B站工具箱,一站式解决视频下载与智能管理难题
  • Input Leap:如何通过跨平台输入虚拟化技术重构多设备工作流?
  • 2026东莞洪梅工厂法律咨询|5家优质本地律所盘点(首选推荐) - GrowthUME
  • Th1 o
  • pandas多维聚合与滚动计算的生产级实践
  • 零代码跨平台UI自动化实践:Midscene.js核心原理与场景驱动开发
  • 2026年6月丨外观设计源头厂家推荐,助力产品脱颖而出 - GrowthUME
  • 2026长春防水补漏维修团队实测盘点TOP4:长春业主房屋渗漏修缮靠谱选择 - 宅安选房屋修缮
  • MC68HC908RFRK2:经典8位MCU架构解析与低功耗无线应用实战
  • 2026年6月比较好的真空煅烧炉厂家推荐,维护简便结构合理保养成本更低 - 品牌推荐师
  • Th1 +
  • GPT-4的2%激活率:MoE架构的工程本质与实战落地
  • 2026长春非急救转运救护车TOP5盘点|吉黑辽跨省、长白山地、院区转诊首选康跃转运 - 吉修匠
  • 2026.5.24
  • 苏州 GEO 优化公司怎么选?实测对比后,优先推荐企优托一网推王超团队 - 新闻快传
  • Gemma 4部署全指南:Apache 2.0开源模型的全设备多模态实战
  • 2026东莞桥头靠谱法律顾问律所推荐|聚焦企业风险前置法律服务(5家精选) - GrowthUME