当前位置: 首页 > news >正文

RocketMQ Streams 1.1.0: 轻量级流处理再出发

本文作者:倪泽,Apache RocketMQ committer、RSQLDB/RocketMQ Streams Maintainer

01 背景

RocketMQ Streams是一款基于RocketMQ为基础的轻量级流计算引擎,具有资源消耗少、部署简单、功能全面的特点,目前已经在社区开源。RocketMQ Streams在阿里云内部被使用在对资源比较敏感,同时又强烈需要流计算的场景,比如在自建机房的云安全场景下。

自RocketMQ Streams开源以来,吸引了大量用户调研和试用。但是也存在一些问题,在RocketMQ Streams 1.1.0中,主要针对以下问题做出了改进和优化。

1、面向用户API不够友好,不能使用泛型,不支持自定义序列化/反序列化;

2、代码冗余,在RocketMQ Streams中存在将流处理拓扑序列化反序列化模块,RocketMQ Streams作为轻量级流处理SDK,构建好流处理节点之后应该可以直接处理数据,不存在将流处理拓扑图本地保存或者网络传输需求。

3、流处理过程不容易理解,含有大量缓存、刷新逻辑;

4、存在大量支持SQL的代码,这部分和SDK方式运行流处理任务的逻辑无关;

在RocketMQ Streams 1.1.0中,对上述问题做出了改进,期望能带来更好的使用体验。同时,重新设计了流处理拓扑构建过程、去掉冗余代码,使得代码更容易被理解。

从今天起,将推出系列文章介绍RocketMQ Streams 1.1.0版本,本次文章主要介绍RocketMQ Streams 1.1.0的API如何使用,如何利用RocketMQ Streams快速构建流处理应用。

02 典型使用示例

本地运行下列示例的步骤:

1、部署RocketMQ 5.0;

2、使用mqAdmin创建topic;

3、构建示例工程,添加依赖,启动示例。RocketMQ Streams 坐标:

<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-streams</artifactId> <version>1.1.0</version> </dependency>

4、向topic中写入相应数据,并观察结果。

更详细文档请参考:https://github.com/apache/roc...

WordCount

public class WordCount { public static void main(String[] args) { StreamBuilder builder = new StreamBuilder("wordCount"); builder.source("sourceTopic", total -> { String value = new String(total, StandardCharsets.UTF_8); return new Pair<>(null, value); }) .flatMap((ValueMapperAction<String, List<String>>) value -> { String[] splits = value.toLowerCase().split("\W+"); return Arrays.asList(splits); }) .keyBy(value -> value) .count() .toRStream() .print(); TopologyBuilder topologyBuilder = builder.build(); Properties properties = new Properties(); properties.put(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:9876"); RocketMQStream rocketMQStream = new RocketMQStream(topologyBuilder, properties); final CountDownLatch latch = new CountDownLatch(1); Runtime.getRuntime().addShutdownHook(new Thread("wordcount-shutdown-hook") { @Override public void run() { rocketMQStream.stop(); latch.countDown(); } }); try { rocketMQStream.start(); latch.await(); } catch (final Throwable e) { System.exit(1); } System.exit(0); } }

WordCount示例要点:

1、JobId wordCount唯一标识流处理任务;

2、自定义的反序列化;

3、一对多转化;

4、lambda形式从数据中指定Key;

5、支持有状态计算;

窗口聚合

public class WindowCount { public static void main(String[] args) { StreamBuilder builder = new StreamBuilder("windowCountUser"); AggregateAction<String, User, Num> aggregateAction = (key, value, accumulator) -> new Num(value.getName(), 100); builder.source("user", source -> { User user1 = JSON.parseObject(source, User.class); return new Pair<>(null, user1); }) .selectTimestamp(User::getTimestamp) .filter(value -> value.getAge() > 0) .keyBy(value -> "key") .window(WindowBuilder.tumblingWindow(Time.seconds(15))) .aggregate(aggregateAction) .toRStream() .print(); TopologyBuilder topologyBuilder = builder.build(); Properties properties = new Properties(); properties.putIfAbsent(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:9876"); properties.put(Constant.TIME_TYPE, TimeType.EVENT_TIME); properties.put(Constant.ALLOW_LATENESS_MILLISECOND, 2000); RocketMQStream rocketMQStream = new RocketMQStream(topologyBuilder, properties); rocketMQStream.start(); } }

窗口聚合示例要点:

1、支持指定时间字段;

2、支持滑动、滚动、会话多种类型window;

3、支持自定义UDAF类型聚合;

4、支持自定义时间类型和数据最大迟到时间;

双流JOIN

public class JoinWindow { public static void main(String[] args) { StreamBuilder builder = new StreamBuilder("joinWindow"); //左流 RStream<User> user = builder.source("user", total -> { User user1 = JSON.parseObject(total, User.class); return new Pair<>(null, user1); }); //右流 RStream<Num> num = builder.source("num", source -> { Num user12 = JSON.parseObject(source, Num.class); return new Pair<>(null, user12); }); //自定义join后的运算 ValueJoinAction<User, Num, Union> action = new ValueJoinAction<User, Num, Union>() { @Override public Union apply(User value1, Num value2) { ... } }; user.join(num) .where(User::getName) .equalTo(Num::getName) .window(WindowBuilder.tumblingWindow(Time.seconds(30))) .apply(action) .print(); TopologyBuilder topologyBuilder = builder.build(); Properties properties = new Properties(); properties.put(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:9876"); RocketMQStream rocketMQStream = new RocketMQStream(topologyBuilder, properties); rocketMQStream.start(); } }

双流聚合示例要点:

1、支持window join和非window join,对于非window join,只需要在上述及连表达式中去掉window即可;

2、支持多种窗口类型的window join;

3、支持对join后数据自定义操作;

03 参与贡献

RocketMQ Streams是Apache RocketMQ的子项目,已经在社区开源,参与RocketMQ Streams相关工作,请参考以下资源:

1、试用RocketMQ Streams,并阅读相关文档以了解更多信息;

maven仓库坐标:

<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-streams</artifactId> <version>1.1.0</version> </dependency>

RocketMQ Streams文档:

https://rocketmq.apache.org/z...

2、参与贡献:如果你有任何功能请求或错误报告,请随时提交 Pull Request 来分享你的反馈和想法;

社区仓库:

https://github.com/apache/roc...

3、联系我们:可以在 GitHub上创建 Issue,向 RocketMQ 邮件列表发送电子邮件,或在 RocketMQ Streams SIG 交流群与专家共同探讨,RocketMQ Streams SIG加入方式:添加“小火箭”微信,回复RocketMQ Streams。

http://www.jsqmd.com/news/746097/

相关文章:

  • XUnity.AutoTranslator完全指南:如何5分钟实现Unity游戏实时自动翻译
  • 扣图公章用什么工具?2026年最全的免费抠图工具推荐指南
  • 鼠标连点器:游戏玩家的得力助手
  • PeachPy未来展望:汇编编程的发展趋势与创新方向
  • 保姆级教程:ROS2 Humble下用rs_launch.py调通你的RealSense D435i(含点云与配准配置)
  • 10分钟掌握AI变声魔法:用RVC WebUI打造专属数字声线
  • 如何永久免费使用Cursor AI Pro功能:终极破解工具完整指南
  • 【2026最新|收藏】大模型落地实战:从认知启蒙到企业赋能,小白/程序员必看
  • ESP32广播/GATT整理
  • 软件评测师基础知识专项刷题:网络安全技术(一)
  • Java科学计算新纪元已开启,TensorFlow Java绑定即将淘汰?——基于Vector API重构矩阵乘法的4.8倍加速实录
  • APK Installer三步法:Windows平台零门槛安装Android应用的突破性方案
  • 【收藏级】2026年Java程序员转行大模型开发全面指南(小白/程序员必看)
  • 密封类取代if-else和Visitor模式,性能提升47%?——基于JMH压测的Java 25真实基准报告
  • BitNet b1.58-GGUF快速部署:单命令supervisord启动+健康检查脚本编写
  • Chaplin:本地化实时唇语识别完整指南,5分钟开启无声语音革命
  • Java 数组必知:Arrays.toString 到底什么时候用
  • 5个技巧快速掌握macOS系统级音频均衡器eqMac的完整使用指南
  • 05 - AMDGPU中的VRAM管理器
  • GPT-SoVITS如何通过边缘计算优化实现毫秒级实时语音合成?
  • 从CREO到URDF:机器人开发的终极自动化转换指南
  • XXMI Launcher终极指南:一站式米哈游游戏模组管理神器
  • 如何构建macOS菜单栏管理系统:5个关键技术突破
  • PeachPy社区贡献指南:从用户到开发者的成长路径
  • 别再只用单片机点灯了!用Multisim仿真4017+运放,体验纯硬件流水灯的乐趣
  • 网盘直链解析助手:八大平台高效下载的完整解决方案
  • Phi-4-mini-reasoning商业应用:智能客服中复杂问题归因分析模块
  • php把运行时重构成常驻内存 + 多进程 + 事件驱动(Reactor) 模式完整流程=workerman
  • WinAppDriver环境搭建避坑大全:解决.NET依赖、版本冲突和‘找不到元素’的常见问题
  • Python模型配置统一管理方案(企业级配置中心落地全图谱)