当前位置: 首页 > news >正文

状态机——事件流中带时间窗口的事件合成

事件流中带时间窗口的事件合成

    • 1、问题概述
    • 2、核心设计
    • 3、代码实现
    • 3、测试

1、问题概述

引入时间窗口(Time Window)是分布式事件流处理(如 Flink/Spark Streaming)中非常经典的场景。

加入时间窗口意味着:子事件不仅要满足逻辑表达式,而且所有参与合成的子事件,其发生的时间差不能超过规定的范围(例如 5 分钟内)。

一旦超出这个窗口,之前收到的过期事件应当被“清空”或“失效”,状态机需要重新等待新的事件流。

要在我们现有的解析器和逻辑树架构上优雅地实现它,最有效的方法是引入一个“带有生命周期的事件注册表(Registry)”,并让叶子节点去这个注册表里查询事件是否有效。

2、核心设计

  1. 带时间戳的事件:传入的不再仅仅是一个 eventId,而是一个包含了 timestamp 的事件对象。
  2. 全局事件时间中心(EventRegistry):统一维护当前窗口内活跃的事件及其发生时间。每次新事件进来时,自动清理掉那些已经和当前最新事件时差超过窗口大小的“老事件”。
  3. 主状态机逻辑:当一个新事件导致窗口滑动、老事件失效时,状态机能实时感知并让逻辑树重新评估。

3、代码实现

第一步:定义事件与事件注册表(带滑窗清理)

// 1. 定义事件对象,包含 ID 和发生的时间戳publicclassTimeEvent{finalintid;finallongtimestamp;// 毫秒publicTimeEvent(intid,longtimestamp){this.id=id;this.timestamp=timestamp;}}// 2. 时间窗口注册表:负责管理哪些事件在窗口内有效classTimeWindowRegistry{privatefinallongwindowSizeMs;// 窗口大小(毫秒)// 记录窗口内每个事件 ID 对应的最新发生时间privatefinalMap<Integer,Long>activeEvents=newHashMap<>();publicTimeWindowRegistry(longwindowSizeMs){this.windowSizeMs=windowSizeMs;}// 激活一个事件,并根据当前最新时间,滑窗清理过期事件publicvoidregisterEvent(TimeEventevent){longcurrentLatestTime=event.timestamp;activeEvents.put(event.id,currentLatestTime);// 滑动窗口的核心:移除所有与当前最新事件时间差超过 windowSizeMs 的旧事件activeEvents.entrySet().removeIf(entry->(currentLatestTime-entry.getValue())>windowSizeMs);}// 检查某个事件当前是否在窗口内有效publicbooleanisEventActive(inteventId){returnactiveEvents.containsKey(eventId);}publicvoidclear(){activeEvents.clear();}}

第二步:改造叶子节点(从静态标记改为动态查询)

以前的 EventLeafNode 是自己内部存一个 triggered = true。现在,它应该去 TimeWindowRegistry 里动态查询自己关心的事件当前是否还在窗口内。

publicinterfaceLogicNode{booleanisSatisfied();// 当前节点是否已满足voidtrigger(inteventId);// 触发子事件voidreset();// 重置状态}// 改造后的叶子节点classTimeEventLeafNodeimplementsLogicNode{privatefinalinteventId;privatefinalTimeWindowRegistryregistry;// 注入注册表的引用publicTimeEventLeafNode(inteventId,TimeWindowRegistryregistry){this.eventId=eventId;this.registry=registry;}@OverridepublicbooleanisSatisfied(){// 动态判断:只有在时间窗口内活跃,才算满足returnregistry.isEventActive(eventId);}@Overridepublicvoidtrigger(intid){// 触发逻辑交由全局注册表通过 feedEvent 统一调度,这里无需单独处理}@Overridepublicvoidreset(){// 注册表清空即可}}// 2. 与节点(And):所有子节点必须全部满足classAndNodeimplementsLogicNode{privatefinalList<LogicNode>children=newArrayList<>();publicAndNode(LogicNode...nodes){children.addAll(Arrays.asList(nodes));}@OverridepublicbooleanisSatisfied(){returnchildren.stream().allMatch(LogicNode::isSatisfied);}@Overridepublicvoidtrigger(inteventId){children.forEach(child->child.trigger(eventId));}@Overridepublicvoidreset(){children.forEach(LogicNode::reset);}}// 3. 或节点(Or):子节点满足其一即可classOrNodeimplementsLogicNode{privatefinalList<LogicNode>children=newArrayList<>();publicOrNode(LogicNode...nodes){children.addAll(Arrays.asList(nodes));}@OverridepublicbooleanisSatisfied(){returnchildren.stream().anyMatch(LogicNode::isSatisfied);}@Overridepublicvoidtrigger(inteventId){children.forEach(child->child.trigger(eventId));}@Overridepublicvoidreset(){children.forEach(LogicNode::reset);}}

第三步:升级解析器(支持注入注册表)
为了配合新的叶子节点,解析器在生成树时,要把 TimeWindowRegistry 传给每一个叶子节点。

publicclassTimeRuleParser{publicstaticLogicNodeparse(Stringexpression,TimeWindowRegistryregistry){expression=expression.replaceAll("\\s+","");Stack<LogicNode>nodes=newStack<>();Stack<Character>operators=newStack<>();inti=0;while(i<expression.length()){charc=expression.charAt(i);if(Character.isDigit(c)){StringBuildersb=newStringBuilder();while(i<expression.length()&&Character.isDigit(expression.charAt(i))){sb.append(expression.charAt(i));i++;}// 【核心改动】创建叶子节点时,注入时间窗口注册表nodes.push(newTimeEventLeafNode(Integer.parseInt(sb.toString()),registry));continue;}elseif(c=='('){operators.push(c);}elseif(c==')'){while(!operators.isEmpty()&&operators.peek()!='('){executeOperator(nodes,operators.pop());}operators.pop();}elseif(c=='&'||c=='|'){while(!operators.isEmpty()&&precedence(operators.peek())>=precedence(c)){executeOperator(nodes,operators.pop());}operators.push(c);}i++;}while(!operators.isEmpty()){executeOperator(nodes,operators.pop());}returnnodes.pop();}privatestaticintprecedence(charop){if(op=='&')return2;if(op=='|')return1;return0;}privatestaticvoidexecuteOperator(Stack<LogicNode>nodes,charop){LogicNoderight=nodes.pop();LogicNodeleft=nodes.pop();if(op=='&')nodes.push(newAndNode(left,right));elseif(op=='|')nodes.push(newOrNode(left,right));}}

第四步:时间窗口状态机上下文

publicclassTimeStateMachine{//主状态privateMainStatecurrentState=MainState.INIT;//抽象语法树根节点privatefinalLogicNoderootCondition;//时间窗口privatefinalTimeWindowRegistryregistry;publicTimeStateMachine(LogicNoderootCondition,TimeWindowRegistryregistry){this.rootCondition=rootCondition;this.registry=registry;}// 接收带时间戳的事件publicsynchronizedvoidfeedEvent(TimeEventevent){if(currentState==MainState.COMPLETED)return;currentState=MainState.PROCESSING;// 1. 将新事件注册进去(内部会自动触发滑窗,淘汰超时事件)registry.registerEvent(event);System.out.println(String.format("收到事件 [%d],发生时间: %dms",event.id,event.timestamp));// 2. 让逻辑树基于当前窗口内的有效事件重新进行整体评估if(rootCondition.isSatisfied()){this.currentState=MainState.COMPLETED;System.out.println("🎉 【成功】在时间窗口内满足公式!流转至 -> "+currentState);}else{System.out.println("-> 当前窗口内未满足公式,继续等待...");}}publicMainStategetCurrentState(){returncurrentState;}}

3、测试

我们设置时间窗口为 5000毫秒(5秒),规则依然是 (1 & 2 & (3 | 4)) & 5。

测试场景 A:由于事件 1 超时,导致合成失败

publicstaticvoidmain(String[]args){StringruleStr="(1 & 2 & (3 | 4)) & 5";// 1. 定义一个 5 秒的时间窗口注册表TimeWindowRegistryregistry=newTimeWindowRegistry(5000);LogicNoderoot=TimeRuleParser.parse(ruleStr,registry);TimeStateMachinestateMachine=newTimeStateMachine(root,registry);System.out.println("--- 模拟场景 A:事件 1 超时被滑出窗口 ---");stateMachine.feedEvent(newTimeEvent(1,1000));// 1 秒时发生事件 1stateMachine.feedEvent(newTimeEvent(5,2000));// 2 秒时发生事件 5stateMachine.feedEvent(newTimeEvent(4,3000));// 3 秒时发生事件 4// 关键点:等到第 7 秒才发生事件 2。// 此时最新时间是 7000,窗口范围是 [2000, 7000],1000ms 发生的事件 1 已经被无情滑出!stateMachine.feedEvent(newTimeEvent(2,7000));System.out.println("最终状态: "+stateMachine.getCurrentState());// 依然是 PROCESSING}
--- 模拟场景 A:事件 1 超时被滑出窗口 --- 收到事件 [1],发生时间: 1000ms -> 当前窗口内未满足公式,继续等待... 收到事件 [5],发生时间: 2000ms -> 当前窗口内未满足公式,继续等待... 收到事件 [4],发生时间: 3000ms -> 当前窗口内未满足公式,继续等待... 收到事件 [2],发生时间: 7000ms -> 当前窗口内未满足公式,继续等待... 最终状态: PROCESSING

测试场景 B:所有事件紧凑发生,成功合成

System.out.println("\n--- 模拟场景 B:5秒窗口内紧凑发生,合成成功 ---");// 重置状态机进行新测试TimeWindowRegistryregistryB=newTimeWindowRegistry(5000);TimeStateMachinestateMachineB=newTimeStateMachine(TimeRuleParser.parse(ruleStr,registryB),registryB);stateMachineB.feedEvent(newTimeEvent(1,1000));// 1sstateMachineB.feedEvent(newTimeEvent(5,2000));// 2sstateMachineB.feedEvent(newTimeEvent(4,3000));// 3sstateMachineB.feedEvent(newTimeEvent(2,4500));// 4.5s -> 最新窗口 [0, 4500],事件 1 还在!
--- 模拟场景 B:5秒窗口内紧凑发生,合成成功 --- 收到事件 [1],发生时间: 1000ms -> 当前窗口内未满足公式,继续等待... 收到事件 [5],发生时间: 2000ms -> 当前窗口内未满足公式,继续等待... 收到事件 [4],发生时间: 3000ms -> 当前窗口内未满足公式,继续等待... 收到事件 [2],发生时间: 4500ms 🎉 【成功】在时间窗口内满足公式!流转至 -> COMPLETED
http://www.jsqmd.com/news/843539/

相关文章:

  • 如何一键检测微信单向好友:WechatRealFriends微信好友关系检测工具完整指南
  • 基于TensorFlow.js与Colab的浏览器端实时目标检测实践
  • 算力基石:CPU、GPU与嵌入式AI的技术逻辑与融合发展
  • NotebookLM重复检测失效真相:为什么92%的用户漏掉这4个关键配置参数?
  • iPhone内移植RFID公交卡:破解金属屏蔽,实现物理刷卡
  • 为什么你的NotebookLM总漏掉核心结论?资深技术传播者揭秘“语义锚定”生成法(仅限前500名开发者掌握)
  • GEO优化选购指南:靠谱品牌与价格分析 - 工业品牌热点
  • 2026年冰袋冰晶粉厂家大揭秘,究竟藏着哪些行业秘密?
  • 解读 A Survey of Data Agents:AI 界的 SAE J3016
  • 日志分析这件事,有了 ELK 才能真正做到可搜索、可视化、可预警
  • LangChain 从入门到实战:大模型应用开发全流程教程
  • 声源定位技术与GCC-PHAT算法详解
  • 【嵌入式 AI 实战第11 期】人体运动与姿态识别(可穿戴应用实战)基于 STM32+MPU6050+TinyML 部署
  • Maven多模块项目里,程序运行时如何优雅地获取自己的版本号?3种方案实测对比
  • 百度网盘直链解析:如何绕过限速实现高速下载的Python实战指南
  • 对比自行维护与使用Taotoken聚合API在稳定性上的体感差异
  • 基于CircuitPython与LED点阵屏的物联网新闻显示器制作指南
  • 优先队列和单调队列的浅浅学习
  • 别再手动激活了!CentOS 7下VCS+SCL开机自启动保姆级配置(含防火墙设置)
  • 3步解锁Wallpaper Engine壁纸资源:RePKG终极提取指南
  • 从零打造动画电子猫:Arduino与针毡工艺的创客实践
  • 金华装修避坑指南/装修有哪些最容易踩的坑?
  • 手把手教你用TMS320F2803x DSP实现PMBus通信(附代码下载与避坑指南)
  • NotebookLM概念关联分析深度拆解(20年NLP专家亲测有效的7层推理模型)
  • 从硬盘到网络:手把手拆解Linux/Windows下SCSI协议栈的完整工作流程
  • GPT时代下非端到端AI方案的融合价值与混合架构实践
  • XUnity自动翻译器:Unity游戏跨语言无障碍体验的完整指南
  • 基于CircuitPython的多传感器物联网环境监测盒设计与实现
  • 【JavaSE全面教学】Java异常处理机制Day11(2026年)
  • XUnity.AutoTranslator:三步实现Unity游戏实时翻译的终极解决方案