当前位置: 首页 > news >正文

基于用户行为的动态标签与SOP触发引擎

一、问题背景

技术背景说明

教育私域运营中,用户从加好友到正价课成交通常经历多个阶段:兴趣期→咨询期→试听期→犹豫期→成交期。每个阶段需要不同的运营策略。例如:

  • 咨询3次未购买 → 标记“高意向-未转化”并推送限时优惠

  • 试听完成3天未购课 → 触发“限时优惠+往期学员案例”组合消息

  • 30天未登录 → 发送“专属学习报告+续费礼包”激活

企微官方限制

官方API仅提供基础的客户标签读写能力,无法实现:

  • 实时行为捕获:无法获取用户在微信端的“微行为”(如阅读公众号文章、点击朋友圈链接)

  • 动态规则计算:不支持基于多条件组合(如“咨询次数>3且最近咨询<24h且未付费”)的自动标签变更

  • SOP任务编排:缺乏内置的定时触发、条件分支等自动化工作流引擎

为什么需要技术手段

需要构建独立的“用户行为分析引擎”,通过API对接第三方协议层获取全量行为数据,利用规则引擎(Drools/EasyRules)或脚本实现实时计算,并通过消息队列驱动SOP任务执行。

二、技术方案

架构设计
行为数据源(iPad协议/小程序/公众号) →Kafka行为流Flink/Spark实时计算规则引擎标签更新SOP触发企微触达

技术选型

  • 消息队列:Kafka(高吞吐、持久化)

  • 实时计算:Apache Flink(CEP复杂事件处理)

  • 规则引擎:Groovy动态脚本(热加载、灵活配置)

  • 数据存储:Redis(实时状态)+ MySQL(历史记录)

方案对比

方案

实时性

规则灵活性

开发复杂度

适用场景

定时SQL扫描

小时级

离线分析,非实时运营

Python脚本轮询

分钟级

中小机构,规则简单

Flink+Groovy

秒级

大型机构,复杂动态规则

三、实现步骤

步骤1:行为数据采集

通过企销宝iPad协议API订阅用户行为事件:

python

# behavior_subscriber.py - 订阅用户行为 import requests import json import pika import time QXB_API = "http://your-qxb-server:8080/api" APP_KEY = "your_app_key" # 获取最近的行为事件(实际场景可使用长轮询或WebSocket) def poll_behaviors(): url = f"{QXB_API}/behavior/pull" params = { 'app_key': APP_KEY, 'last_seq': get_last_seq(), # 从Redis获取上次处理的位置 'limit': 100 } resp = requests.get(url, params=params) events = resp.json().get('events', []) for event in events: # 发送到Kafka send_to_kafka('user_behaviors', json.dumps(event)) update_last_seq(events[-1]['seq'] if events else get_last_seq()) def send_to_kafka(topic, message): # Kafka生产者代码(略) pass

步骤2:Flink实时计算任务

java

// BehaviorRuleJob.java - Flink实时规则计算 import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.cep.CEP; import org.apache.flink.cep.PatternStream; import org.apache.flink.cep.pattern.Pattern; import org.apache.flink.cep.pattern.conditions.SimpleCondition; public class BehaviorRuleJob { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 从Kafka消费行为数据 FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>( "user_behaviors", new SimpleStringSchema(), getKafkaProps() ); DataStream<String> stream = env.addSource(consumer); // 定义规则:连续3次咨询未购买 Pattern<String, ?> pattern = Pattern .<String>begin("first") .where(new SimpleCondition<String>() { @Override public boolean filter(String event) throws Exception { JSONObject obj = new JSONObject(event); return "consult".equals(obj.getString("type")); } }) .times(3) .followedBy("no_purchase") .where(new SimpleCondition<String>() { @Override public boolean filter(String event) throws Exception { JSONObject obj = new JSONObject(event); return "no_purchase".equals(obj.getString("user_status")); } }); // 应用规则 PatternStream<String> patternStream = CEP.pattern(stream, pattern); // 匹配结果处理 patternStream.select(new PatternSelectFunction<String, String>() { @Override public String select(Map<String, List<String>> pattern) throws Exception { // 触发规则:发送到SOP执行队列 String userId = extractUserId(pattern); sendToSopQueue("high_intent_no_purchase", userId); return "matched"; } }); env.execute("Behavior Rule Engine"); } }

步骤3:SOP任务执行器

python

# sop_executor.py - SOP任务执行 import redis import json import time import requests from threading import Thread r = redis.Redis(host='localhost', port=6379, decode_responses=True) # SOP任务配置 SOP_TASKS = { 'high_intent_no_purchase': { 'name': '高意向未转化-限时优惠', 'delay': 300, # 触发后延迟5分钟发送 'content': '注意到您最近多次咨询{course}课程,特别为您准备了一张限时优惠券...', 'tags': ['高意向-待转化'] }, 'trial_complete_3d_no_buy': { 'name': '试听完成3天未购-案例推送', 'delay': 259200, # 3天 'content': '您试听的课程已经过去3天了,很多学员反馈通过这门课...', 'tags': ['试听-需跟进'] } } def sop_worker(): """SOP任务队列消费者""" while True: # 从Redis阻塞获取任务 task_data = r.blpop('sop_queue', timeout=0) if not task_data: continue task = json.loads(task_data[1]) rule_id = task['rule_id'] user_id = task['user_id'] params = task.get('params', {}) sop_config = SOP_TASKS[rule_id] # 延迟执行 time.sleep(sop_config.get('delay', 0)) # 1. 打标签 tag_user(user_id, sop_config['tags']) # 2. 发送消息 content = sop_config['content'].format(**params) send_msg(user_id, content) # 3. 记录执行日志 log_execution(rule_id, user_id) def send_msg(user_id, content): """通过企销宝API发送消息""" requests.post(f"{QXB_API}/message/send_text", json={ 'app_key': APP_KEY, 'wxid': 'assistant_001', 'to_wxid': user_id, 'content': content }) # 启动多个worker线程 for i in range(5): t = Thread(target=sop_worker) t.daemon = True t.start()

四、最佳实践

规则管理

  • 动态热加载:将规则存储在数据库,通过Groovy脚本动态编译,支持运营人员在线调整规则而无需重启服务

  • 规则版本控制:每条规则记录版本号,执行时记录使用的规则版本,便于效果回溯

性能优化

  • 状态存储:使用Redis存储用户实时状态(如咨询次数、最后互动时间),避免每次计算都扫描历史数据

  • 窗口计算:对于“最近24小时”等时间窗口条件,使用Flink的滑动窗口功能,避免全量扫描

踩坑经验

  • 规则冲突:多条规则可能同时匹配同一用户,需定义优先级策略(如只执行优先级最高的,或按顺序执行)

  • 死循环风险:避免规则触发后产生的行为又触发了同一条规则(如发送消息→用户回复→再次触发),需在规则条件中排除机器人发送的消息

五、工具推荐

企销宝在本方案中作为行为数据源和执行终端:

  • 数据采集:提供完整的用户行为API,包括消息收发、朋友圈互动、群事件等,为规则引擎提供丰富输入

  • 多账号并发:支持数百个账号同时在线,满足大规模私域运营的数据采集需求

  • 执行终端:规则触发后,通过其API完成打标签、发消息等操作,形成闭环

http://www.jsqmd.com/news/475568/

相关文章:

  • 2026首版次高端软件申报全流程指南:中承信安权威解析
  • AutoML 的自动化边界问题
  • docker部署New-API
  • Ozon卖家必看:26年三大选品工具格局解析,谁能成为赛道效率之王
  • Java程序冷启动时CPU优化实践
  • 什么?你的C盘满了?看我怎么帮你空出100G!
  • 一天生成100条带货视频,ai短视频新生产力工具——LinkPix
  • 【Public Key Retrievalis not allowed】com.mysql.cj.exceptions.UnableToConnectException
  • C-NCAP2024 AEB VRU测试全解析
  • 白色情人节
  • 计算机毕设指南详解
  • Docker 进阶(二)Swarm
  • actxprxy.dll文件彻底修复方法 附免费的下载解决办法
  • 从零掌握 Spring AI Alibaba Skill:定义、注册与渐进式披露
  • 34岁大厂程序员被裁当场痛哭:月供2.6万!43岁被裁、赔偿金只够撑半年!
  • 小白努力学习技术,从1级升级开始 目前等级:14级(0/10)
  • FR相对层次坐标与绝对层次坐标的区别
  • RGB显示驱动MCU单片机—CH643键盘应用方案
  • 从事ar装配行业的公司有哪些
  • 〘 6-2 〙软考高项 | 第13章:项目资源管理(下)
  • Comsol 中的空气棒板电晕放电等离子体仿真:电场强度那些事儿
  • 给大家普及一下,学大模型最快的邪修路线!
  • Google官方介绍Android 16 新特性都有哪些
  • Harmonyos应用实例86. 多边形的面积:平行四边形转化动画
  • 逢年过节送晚辈什么好?这 10 件 “保送“ 级礼物闭眼入不出错
  • Git系列五:本地仓库常用命令
  • BetterNCM插件增强指南:解锁网易云音乐功能扩展新体验
  • 本地Cookie管理与隐私保护全面指南
  • 交易所-充值与提现
  • mysql安装教程