当前位置: 首页 > news >正文

ROS2 Python 教学合并版:从环境搭建到 Topic 通信实战

  1. 搭建 ROS2 Humble + WSL2 + Ubuntu 22.04 开发环境。
  2. 编写第一个 Python 节点,理解rclpyNodespin
  3. 从普通 Python 类进化到正式 ROS2 节点类。
  4. 学习多线程与回调,避免耗时任务卡死节点。
  5. 完成一个Topic 发布订阅与语音朗读项目。

一、环境搭建:ROS2 Humble + WSL2 + Ubuntu 22.04

1.1 安装思路

推荐使用 Ubuntu 22.04 搭配 ROS2 Humble。Humble 是 Ubuntu 22.04 对应的 ROS2 LTS 版本,资料多、插件丰富、稳定性适合入门和项目练习。

可以使用鱼香 ROS 的一键安装脚本:

wgethttp://fishros.com/install-Ofishros&&.fishros

这类脚本通常会完成换源、添加 ROS2 软件源、安装 ROS2 基础组件等步骤。国内网络环境下,换成清华、华为等镜像源可以明显减少下载失败和超时。

1.2 验证 ROS2 是否安装成功

ros2--version

如果能正常输出版本信息,说明ros2命令已经进入当前终端环境。

1.3 VS Code 开发环境

推荐安装这些插件:

  • WSL
  • Remote Development
  • Python
  • ROS

WSL 插件负责让 VS Code 进入 Ubuntu 子系统;Python 插件负责补全、跳转、调试;ROS 插件可以辅助识别 ROS2 工作空间、包和消息类型。


二、ROS2 第一个 Python 节点

2.1 节点最小代码

先从最小节点开始理解 ROS2 程序的骨架。

importrclpyfromrclpy.nodeimportNodedefmain():rclpy.init()# 初始化 ROS2 通信环境node=Node('py_node')# 创建节点,节点名为 py_nodenode.get_logger().info('你好 ROS2!')rclpy.spin(node)# 循环监听rclpy.shutdown()# 释放资源

2.2 关键语句解释

rclpy.init()像开机启动。它会初始化当前进程的 ROS2 通信环境,包括上下文、网络发现和底层通信资源。

Node('py_node')给程序一个节点身份。在 ROS2 网络里,节点名就像身份证,其他工具和节点可以通过这个名字识别它。

node.get_logger().info()是 ROS2 推荐的日志输出方式。它比print更适合机器人系统,因为日志会带时间戳、等级,并且能被rqt_console等工具查看。

rclpy.spin(node)是节点持续运行的关键。没有spin,Python 脚本执行完就退出;有了spin,节点会不断检查是否有消息、定时器或其他回调需要执行。

rclpy.shutdown()用于优雅退出,通知 ROS2 回收资源。


三、规范化开发:工作空间与功能包

ROS2 项目不建议把代码零散放在任意目录里,而是通过工作空间和功能包管理。

3.1 创建工作空间

mkdir-p~/ros2_ws/srccd~/ros2_ws/src

ros2_ws是工作空间,src用于存放功能包源码。

3.2 创建 Python 功能包

ros2 pkg create --build-type ament_python--licenseApache-2.0 demo_python_pkg

--build-type ament_python表示这是一个 Python 包。demo_python_pkg是包名。

3.3 编译与运行前置环境

回到工作空间根目录:

cd~/ros2_ws colcon buildsourceinstall/setup.bash

colcon build会检查依赖、执行安装脚本,把 Python 源码安装到install目录,并生成环境脚本。

source install/setup.bash会更新当前终端的环境变量,例如PATHPYTHONPATH。如果忘了执行,ros2 run可能会找不到刚刚编写的包或节点。


四、从 Python 类进化到 ROS2 节点

ROS2 Python 节点本质上通常是一个继承自Node的类。先理解普通 Python 类,再接入 ROS2,会更顺。

4.1 阶段一:普通 Python 类

classPersonNode:def__init__(self,name_value:str,age_value:int)->None:self.name=name_value self.age=age_valuedefeat(self,food_name:str):print(f"{self.name},{self.age}岁,爱吃{food_name}")

__init__是构造函数。当执行node = PersonNode(...)时,Python 会自动调用它。它常用于初始化对象属性。

self代表对象自己。没有self,变量只是函数内部的临时变量;有了self,变量会跟着对象一起存在。

4.2 阶段二:多实例验证

defmain():node=PersonNode('法外狂徒张三',18)node1=PersonNode('法外狂徒小王',89)node.eat('鱼香肉丝')node1.eat('西红柿')

同一个类可以创建多个对象,每个对象都有自己的nameage。在机器人程序里,这对应一种很常见的写法:写一个MotorController类,再创建left_motorright_motor两个实例。

4.3 阶段三:继承与super()

新建writer_node.py

fromdemo_python_pkg.person_nodeimportPersonNodeclassWriterNode(PersonNode):def__init__(self,name:str,age:int,book:str)->None:super().__init__(name,age)self.book=book

class WriterNode(PersonNode)表示WriterNode继承PersonNode。子类会自动拥有父类的方法,例如eat

super().__init__(name, age)用于调用父类构造函数。如果不调用,父类中的nameage就不会被正确初始化。

4.4 阶段四:正式成为 ROS2 节点

importrclpyfromrclpy.nodeimportNodeclassPersonNode(Node):def__init__(self,node_name:str,name_value:str,age_value:int)->None:super().__init__(node_name)self.name=name_value self.age=age_valuedefeat(self,food_name:str):self.get_logger().info(f"{self.name},{self.age}岁,爱吃{food_name}")

这里的变化有三个:

  1. PersonNode继承了 ROS2 的Node类。
  2. super().__init__(node_name)把节点注册进 ROS2 通信系统。
  3. self.get_logger().info()替代了print,输出变成 ROS2 日志。

如果不调用super().__init__(node_name),这个对象就无法真正成为 ROS2 节点,也无法调用self.get_logger()

4.5 在setup.py中注册运行入口

'console_scripts':['person_node = demo_python_pkg.person_node:main','writer_node = demo_python_pkg.writer_node:main',],

这相当于给节点注册快捷方式。当执行:

ros2 run demo_python_pkg person_node

ROS2 会根据console_scripts找到demo_python_pkg.person_node里的main函数并执行。


五、多线程下载器与回调机制

在机器人程序里,传感器读取、图片处理、网络请求、语音合成等任务可能很耗时。如果这些任务直接写在 ROS2 回调里,节点就容易卡住。

5.1 准备本地 HTTP 文件服务

创建测试小说文件:

echo"第一章 少年踏上修仙路">novel1.txtecho"第二章 学习修仙,马上就上天">novel2.txt

启动本地 HTTP 服务:

python3-mhttp.server8000--bind0.0.0.0

5.2 多线程下载器

importthreadingimportrequestsclassDownLoader:defdownload(self,url,callback_func):print(f'线程:{threading.get_ident()}开始下载')response=requests.get(url)response.encoding='utf-8'callback_func(url,response.text)defstart_download(self,url,callback_func):thread=threading.Thread(target=self.download,args=(url,callback_func))thread.start()

requests.get(url)是网络 IO 操作,可能阻塞当前线程。放到子线程后,主程序可以继续运行。

threading.Thread(target=..., args=...)表示创建一个子线程,target是子线程要执行的函数,args是传给函数的参数。

callback_func是回调函数。下载完成后,下载器主动调用它,把结果交回去。这是一种“干完活再通知你”的模式。

5.3 为什么 ROS2 节点需要理解多线程

ROS2 节点里的消息接收、定时器等逻辑都是回调。如果在某个回调里执行耗时任务,例如几秒钟的图像处理,节点可能在这段时间里无法处理新的消息。

常见处理思路有两种:

  1. 把耗时任务交给独立线程或线程池。
  2. 使用 ROS2 的多线程执行器,例如MultiThreadedExecutor

5.4 常见问题

为什么要设置response.encoding = 'utf-8'

因为中文内容在网络响应里可能无法被自动识别为 UTF-8,手动设置可以避免乱码。

为什么start_download不直接写self.download(...)

直接调用就是同步执行,主程序仍然会等待下载完成。只有通过threading.Thread包装并启动,下载才会在后台异步运行。

threading.get_ident()是什么?

它会返回当前线程 ID,常用于验证代码是否真的运行在子线程中。


六、Topic 通信实战:修仙小说自动朗读系统

这一部分把前面的节点、队列、多线程和回调串起来,完成一个 ROS2 Topic 项目:

  • 发布者:下载小说文本,把每一行发布到/novel话题。
  • 订阅者:订阅/novel话题,把收到的文本交给语音引擎朗读。

6.1 创建新的工作空间与功能包

mkdir-p~/chapt3_ws/srccd~/chapt3_ws/src ros2 pkg create demo_python_topic --build-type ament_python--dependenciesrclpy example_interfaces--licenseApache-2.0

rclpy是 Python 节点必需依赖。example_interfaces提供标准消息类型,例如String

6.2 准备小说文件服务器

在当前文件夹下新建novel1.txt

第一章 少年踏上修仙路 第二章 学习修仙,马上上天 第三章 女大三千,位列仙班

启动 HTTP 服务:

python3-mhttp.server8000--bind0.0.0.0

6.3 发布者第一版:只下载不发布

novel_pub_node.py

importrclpyfromrclpy.nodeimportNodeimportrequestsclassNovelPubNode(Node):def__init__(self,node_name):super().__init__(node_name)self.get_logger().info(f'{node_name},启动!')defdownload(self,url):response=requests.get(url)response.encoding='utf-8'self.get_logger().info(f'下载完成:{url}, 长度:{len(response.text)}')defmain():rclpy.init()node=NovelPubNode('novel_pub')node.download('http://localhost:8000/novel1.txt')rclpy.spin(node)rclpy.shutdown()

这一版先验证节点可以启动,并且能够通过 HTTP 下载小说内容。

6.4 发布者第二版:发布 Topic

目标:把下载到的小说内容切分成行,然后定时发布到novel话题。

importrclpyfromrclpy.nodeimportNodeimportrequestsfromexample_interfaces.msgimportStringfromqueueimportQueueclassNovelPubNode(Node):def__init__(self,node_name):super().__init__(node_name)self.get_logger().info(f'{node_name},启动!')self.novels_queue_=Queue()self.nobel_pubblisher=self.create_publisher(String,'novel',10)self.create_timer(3.0,self.timer_callback)deftimer_callback(self):ifself.novels_queue_.qsize()>0:line=self.novels_queue_.get()msg=String()msg.data=line self.nobel_pubblisher.publish(msg)self.get_logger().info(f'发布了:{msg.data}')self.novels_queue_.put(line)defdownload(self,url):response=requests.get(url)response.encoding='utf-8'forlineinresponse.text.splitlines():ifline.strip():self.novels_queue_.put(line)

核心点:

  1. create_publisher(String, 'novel', 10)创建发布者。
  2. String是消息类型,发布和订阅两端必须一致。
  3. 'novel'是话题名,ROS2 中实际显示时通常会看到/novel
  4. 10是消息队列深度,接收方处理慢时可以缓存最近的消息。
  5. create_timer(3.0, self.timer_callback)每 3 秒触发一次发布逻辑。
  6. 不要用while True持续发布,否则可能卡住节点其他功能。

6.5 订阅者:接收文本并语音朗读

安装语音引擎:

sudoaptupdatesudoaptinstallespeak-ng-ypip3installespeakng

novel_sub_node.py

importrclpyfromrclpy.nodeimportNodefromexample_interfaces.msgimportStringfromqueueimportQueueimportthreadingimporttimeimportespeakngclassNovelSubNode(Node):def__init__(self,node_name):super().__init__(node_name)self.get_logger().info(f'{node_name},启动!')self.novels_queue_=Queue()self.nobels_subscriber_=self.create_subscription(String,'novel',self.novel_callback,10)self.speech_thread_=threading.Thread(target=self.speake_thread)self.speech_thread_.start()defnovel_callback(self,msg):self.novels_queue_.put(msg.data)self.get_logger().info(f'收到新章节:{msg.data}')defspeake_thread(self):speaker=espeakng.Speaker()speaker.voice='zh'whilerclpy.ok():ifself.novels_queue_.qsize()>0:text=self.novels_queue_.get()self.get_logger().info(f'正在朗读:{text}')speaker.say(text)speaker.wait()else:time.sleep(1)defmain():rclpy.init()node=NovelSubNode('novel_sub')rclpy.spin(node)rclpy.shutdown()

订阅者里有两个关键设计:

  1. create_subscription(String, 'novel', self.novel_callback, 10)订阅发布者的同名话题。
  2. 语音朗读放进独立线程,避免speaker.wait()卡住 ROS2 主线程。

rclpy.ok()可以感知节点是否还在运行。当按下Ctrl+C后,后台线程能跟着退出。

6.6 注册 Topic 节点入口

修改setup.py

'console_scripts':['novel_pub_node = demo_python_topic.novel_pub_node:main','novel_sub_node = demo_python_topic.novel_sub_node:main',],

如果你还在同一个教学包里整理全部案例,可以把入口统一写成:

'console_scripts':['person_node = demo_python_pkg.person_node:main','writer_node = demo_python_pkg.writer_node:main','learn_thread = demo_python_pkg.learn_thread:main','novel_pub_node = demo_python_topic.novel_pub_node:main','novel_sub_node = demo_python_topic.novel_sub_node:main',],

实际项目里,入口属于哪个包,就应写在那个包自己的setup.py中。

6.7 编译与联调

在工作空间根目录执行:

colcon buildsourceinstall/setup.bash

终端 1 启动发布者:

ros2 run demo_python_topic novel_pub_node

终端 2 启动订阅者:

sourceinstall/setup.bash ros2 run demo_python_topic novel_sub_node

如果发布者和订阅者都正常运行,订阅者会收到发布者发来的小说文本,并调用语音引擎朗读。

6.8 Topic 调试命令

查看当前 ROS2 网络中的话题:

ros2 topic list

直接查看/novel话题内容:

ros2 topicecho/novel

查看/novel发布频率:

ros2 topic hz /novel

查看发布者节点信息:

ros2nodeinfo /novel_pub

排查顺序建议:

  1. 先用ros2 topic list看有没有/novel
  2. 再用ros2 topic echo /novel看有没有数据。
  3. 如果 echo 有数据,发布者没大问题,重点查订阅者。
  4. 如果 echo 没数据,重点查发布者、下载逻辑、定时器和source install/setup.bash

七、核心知识总结

7.1 ROS2 节点生命周期

一个最小 Python 节点通常包含:

rclpy.init()node=Node('node_name')rclpy.spin(node)rclpy.shutdown()

类形式节点通常写成:

classMyNode(Node):def__init__(self):super().__init__('my_node')

7.2 工作空间运行规律

每次修改代码后,通常需要:

colcon buildsourceinstall/setup.bash

新开终端后,也要重新执行source install/setup.bash

7.3 Topic 发布订阅规律

发布者负责:

publisher=self.create_publisher(String,'topic_name',10)publisher.publish(msg)

订阅者负责:

subscriber=self.create_subscription(String,'topic_name',callback,10)

两端的消息类型和话题名必须一致。

7.4 为什么使用Queue

下载、发布、朗读的速度可能完全不同。Queue可以把这些环节解耦:

  • 下载快了,先把数据放进队列。
  • 发布按定时器慢慢取。
  • 朗读更慢,也可以按自己的节奏消费。

这样各模块互不硬等,节点更稳定。

7.5 避坑清单

  • 忘记source install/setup.bashros2 run找不到包或入口。
  • 没有rclpy.spin(node):节点启动后立刻结束。
  • 没有super().__init__(node_name):类无法真正注册为 ROS2 节点。
  • 发布和订阅话题名不一致:订阅者收不到消息。
  • 发布和订阅消息类型不一致:通信失败或行为异常。
  • 在回调里执行耗时任务:节点卡死、消息堆积。
  • 中文 HTTP 文本没设置 UTF-8:下载内容乱码。

八、学习路线建议

建议按以下顺序练习:

  1. 只运行第一个py_node,确认 ROS2 基础环境可用。
  2. 创建demo_python_pkg,练习person_nodewriter_node
  3. learn_thread.py单独理解线程和回调。
  4. 创建demo_python_topic,先写发布者,再写订阅者。
  5. ros2 topic echo /novel验证数据流。
  6. 最后接入espeakng,完成自动朗读系统。

学完这条线后,你已经掌握了 ROS2 Python 入门最重要的几块拼图:节点、包、构建、入口、面向对象、线程、队列、发布者、订阅者和调试命令。

http://www.jsqmd.com/news/652904/

相关文章:

  • 解决篡改猴开启开发者模式后,脚本没有生效的情况
  • 生成式AI安全审计方案落地全图谱(2024金融/医疗双行业实测版)
  • 元机器人Project MetaGenesis 项目立项申请书
  • 保姆级教程:用Python+Matplotlib复现类人机器人舞蹈动作仿真(附完整代码)
  • 别再只盯着GPS了!手把手拆解AIS的TDMA通信协议,看船舶如何“排队”报位置
  • Spring Boot 启动性能调优方案
  • OpenClaw是什么?如何部署OpenClaw?2026年阿里云配置OpenClaw及百炼Coding Plan教程
  • 别再死记硬背了!JavaScript 三大核心:ECMAScript、DOM、BOM 一文讲透
  • 解锁ESP芯片的瑞士军刀:esptool如何让固件烧录变得如此简单?
  • OpenClaw是什么?2026年OpenClaw怎么集成?9分钟云端新手安装及百炼Coding Plan流程
  • Gopeed Flutter 开源下载工具:从 IDM 替代到全平台高速下载实践
  • 从一坨面条代码开始——V1最小原型
  • 2026 .NET 面试八股文:高频题 + 答案 + 原理(进阶核心篇)
  • ICLR 2026 | 时间序列(Time Series)高分论文的Rebuttal策略与趋势洞察
  • 【2026知网/维普新规】论文AI率全线飘红怎么办?实测3步降至15%以内的核心攻略
  • 为什么92%的AI写作项目在Q2失败?2026奇点大会权威数据揭示:4类隐性语义断层正在吞噬ROI
  • 告别设备限制:Sunshine自托管游戏串流完全实践指南
  • 从策划需求到技术实现:如何为Unity项目定制一个带“动态显隐”的刷草编辑器?
  • 吴斌医生介绍
  • 国际标准采用程度是指国家标准对国际标准或国外先进标准的采纳程度,是标准化工作中的核心概念
  • 从芯片到应用:AD8302对数检波器在射频信号测量中的实战解析
  • OpenClaw是什么?OpenClaw怎么集成?2026年OpenClaw部署及阿里云百炼Coding Plan步骤
  • 3步构建高效自动化工具:大麦网抢票脚本实战指南
  • 2026年4月新发布:沧州房地产企业选择二级资质代办服务的关键指南 - 2026年企业推荐榜
  • Cadence Allegro测试点从入门到精通:手把手教你创建合规的10/50mil过孔焊盘与底层开窗
  • [Android] 学会打绳结 Knots 3D v10.8.3
  • 测试架构师核心能力:缺陷预防设计思维
  • 【Agent-阿程】AI先锋杯·14天征文挑战第14期-第8天-大模型量化压缩与轻量化部署实战
  • 学Simulink——基于Simulink的CLLC谐振变换器双向对称控制
  • 房东网络/合租上网必看:如何用一台新路由器安全搭建自己的“子网”(华硕/腾达路由器设置详解)