ROS2 Python 教学合并版:从环境搭建到 Topic 通信实战
- 搭建 ROS2 Humble + WSL2 + Ubuntu 22.04 开发环境。
- 编写第一个 Python 节点,理解
rclpy、Node、spin。 - 从普通 Python 类进化到正式 ROS2 节点类。
- 学习多线程与回调,避免耗时任务卡死节点。
- 完成一个Topic 发布订阅与语音朗读项目。
一、环境搭建:ROS2 Humble + WSL2 + Ubuntu 22.04
1.1 安装思路
推荐使用 Ubuntu 22.04 搭配 ROS2 Humble。Humble 是 Ubuntu 22.04 对应的 ROS2 LTS 版本,资料多、插件丰富、稳定性适合入门和项目练习。
可以使用鱼香 ROS 的一键安装脚本:
wgethttp://fishros.com/install-Ofishros&&.fishros这类脚本通常会完成换源、添加 ROS2 软件源、安装 ROS2 基础组件等步骤。国内网络环境下,换成清华、华为等镜像源可以明显减少下载失败和超时。
1.2 验证 ROS2 是否安装成功
ros2--version如果能正常输出版本信息,说明ros2命令已经进入当前终端环境。
1.3 VS Code 开发环境
推荐安装这些插件:
- WSL
- Remote Development
- Python
- ROS
WSL 插件负责让 VS Code 进入 Ubuntu 子系统;Python 插件负责补全、跳转、调试;ROS 插件可以辅助识别 ROS2 工作空间、包和消息类型。
二、ROS2 第一个 Python 节点
2.1 节点最小代码
先从最小节点开始理解 ROS2 程序的骨架。
importrclpyfromrclpy.nodeimportNodedefmain():rclpy.init()# 初始化 ROS2 通信环境node=Node('py_node')# 创建节点,节点名为 py_nodenode.get_logger().info('你好 ROS2!')rclpy.spin(node)# 循环监听rclpy.shutdown()# 释放资源2.2 关键语句解释
rclpy.init()像开机启动。它会初始化当前进程的 ROS2 通信环境,包括上下文、网络发现和底层通信资源。
Node('py_node')给程序一个节点身份。在 ROS2 网络里,节点名就像身份证,其他工具和节点可以通过这个名字识别它。
node.get_logger().info()是 ROS2 推荐的日志输出方式。它比print更适合机器人系统,因为日志会带时间戳、等级,并且能被rqt_console等工具查看。
rclpy.spin(node)是节点持续运行的关键。没有spin,Python 脚本执行完就退出;有了spin,节点会不断检查是否有消息、定时器或其他回调需要执行。
rclpy.shutdown()用于优雅退出,通知 ROS2 回收资源。
三、规范化开发:工作空间与功能包
ROS2 项目不建议把代码零散放在任意目录里,而是通过工作空间和功能包管理。
3.1 创建工作空间
mkdir-p~/ros2_ws/srccd~/ros2_ws/srcros2_ws是工作空间,src用于存放功能包源码。
3.2 创建 Python 功能包
ros2 pkg create --build-type ament_python--licenseApache-2.0 demo_python_pkg--build-type ament_python表示这是一个 Python 包。demo_python_pkg是包名。
3.3 编译与运行前置环境
回到工作空间根目录:
cd~/ros2_ws colcon buildsourceinstall/setup.bashcolcon build会检查依赖、执行安装脚本,把 Python 源码安装到install目录,并生成环境脚本。
source install/setup.bash会更新当前终端的环境变量,例如PATH和PYTHONPATH。如果忘了执行,ros2 run可能会找不到刚刚编写的包或节点。
四、从 Python 类进化到 ROS2 节点
ROS2 Python 节点本质上通常是一个继承自Node的类。先理解普通 Python 类,再接入 ROS2,会更顺。
4.1 阶段一:普通 Python 类
classPersonNode:def__init__(self,name_value:str,age_value:int)->None:self.name=name_value self.age=age_valuedefeat(self,food_name:str):print(f"{self.name},{self.age}岁,爱吃{food_name}")__init__是构造函数。当执行node = PersonNode(...)时,Python 会自动调用它。它常用于初始化对象属性。
self代表对象自己。没有self,变量只是函数内部的临时变量;有了self,变量会跟着对象一起存在。
4.2 阶段二:多实例验证
defmain():node=PersonNode('法外狂徒张三',18)node1=PersonNode('法外狂徒小王',89)node.eat('鱼香肉丝')node1.eat('西红柿')同一个类可以创建多个对象,每个对象都有自己的name和age。在机器人程序里,这对应一种很常见的写法:写一个MotorController类,再创建left_motor、right_motor两个实例。
4.3 阶段三:继承与super()
新建writer_node.py:
fromdemo_python_pkg.person_nodeimportPersonNodeclassWriterNode(PersonNode):def__init__(self,name:str,age:int,book:str)->None:super().__init__(name,age)self.book=bookclass WriterNode(PersonNode)表示WriterNode继承PersonNode。子类会自动拥有父类的方法,例如eat。
super().__init__(name, age)用于调用父类构造函数。如果不调用,父类中的name和age就不会被正确初始化。
4.4 阶段四:正式成为 ROS2 节点
importrclpyfromrclpy.nodeimportNodeclassPersonNode(Node):def__init__(self,node_name:str,name_value:str,age_value:int)->None:super().__init__(node_name)self.name=name_value self.age=age_valuedefeat(self,food_name:str):self.get_logger().info(f"{self.name},{self.age}岁,爱吃{food_name}")这里的变化有三个:
PersonNode继承了 ROS2 的Node类。super().__init__(node_name)把节点注册进 ROS2 通信系统。self.get_logger().info()替代了print,输出变成 ROS2 日志。
如果不调用super().__init__(node_name),这个对象就无法真正成为 ROS2 节点,也无法调用self.get_logger()。
4.5 在setup.py中注册运行入口
'console_scripts':['person_node = demo_python_pkg.person_node:main','writer_node = demo_python_pkg.writer_node:main',],这相当于给节点注册快捷方式。当执行:
ros2 run demo_python_pkg person_nodeROS2 会根据console_scripts找到demo_python_pkg.person_node里的main函数并执行。
五、多线程下载器与回调机制
在机器人程序里,传感器读取、图片处理、网络请求、语音合成等任务可能很耗时。如果这些任务直接写在 ROS2 回调里,节点就容易卡住。
5.1 准备本地 HTTP 文件服务
创建测试小说文件:
echo"第一章 少年踏上修仙路">novel1.txtecho"第二章 学习修仙,马上就上天">novel2.txt启动本地 HTTP 服务:
python3-mhttp.server8000--bind0.0.0.05.2 多线程下载器
importthreadingimportrequestsclassDownLoader:defdownload(self,url,callback_func):print(f'线程:{threading.get_ident()}开始下载')response=requests.get(url)response.encoding='utf-8'callback_func(url,response.text)defstart_download(self,url,callback_func):thread=threading.Thread(target=self.download,args=(url,callback_func))thread.start()requests.get(url)是网络 IO 操作,可能阻塞当前线程。放到子线程后,主程序可以继续运行。
threading.Thread(target=..., args=...)表示创建一个子线程,target是子线程要执行的函数,args是传给函数的参数。
callback_func是回调函数。下载完成后,下载器主动调用它,把结果交回去。这是一种“干完活再通知你”的模式。
5.3 为什么 ROS2 节点需要理解多线程
ROS2 节点里的消息接收、定时器等逻辑都是回调。如果在某个回调里执行耗时任务,例如几秒钟的图像处理,节点可能在这段时间里无法处理新的消息。
常见处理思路有两种:
- 把耗时任务交给独立线程或线程池。
- 使用 ROS2 的多线程执行器,例如
MultiThreadedExecutor。
5.4 常见问题
为什么要设置response.encoding = 'utf-8'?
因为中文内容在网络响应里可能无法被自动识别为 UTF-8,手动设置可以避免乱码。
为什么start_download不直接写self.download(...)?
直接调用就是同步执行,主程序仍然会等待下载完成。只有通过threading.Thread包装并启动,下载才会在后台异步运行。
threading.get_ident()是什么?
它会返回当前线程 ID,常用于验证代码是否真的运行在子线程中。
六、Topic 通信实战:修仙小说自动朗读系统
这一部分把前面的节点、队列、多线程和回调串起来,完成一个 ROS2 Topic 项目:
- 发布者:下载小说文本,把每一行发布到
/novel话题。 - 订阅者:订阅
/novel话题,把收到的文本交给语音引擎朗读。
6.1 创建新的工作空间与功能包
mkdir-p~/chapt3_ws/srccd~/chapt3_ws/src ros2 pkg create demo_python_topic --build-type ament_python--dependenciesrclpy example_interfaces--licenseApache-2.0rclpy是 Python 节点必需依赖。example_interfaces提供标准消息类型,例如String。
6.2 准备小说文件服务器
在当前文件夹下新建novel1.txt:
第一章 少年踏上修仙路 第二章 学习修仙,马上上天 第三章 女大三千,位列仙班启动 HTTP 服务:
python3-mhttp.server8000--bind0.0.0.06.3 发布者第一版:只下载不发布
novel_pub_node.py:
importrclpyfromrclpy.nodeimportNodeimportrequestsclassNovelPubNode(Node):def__init__(self,node_name):super().__init__(node_name)self.get_logger().info(f'{node_name},启动!')defdownload(self,url):response=requests.get(url)response.encoding='utf-8'self.get_logger().info(f'下载完成:{url}, 长度:{len(response.text)}')defmain():rclpy.init()node=NovelPubNode('novel_pub')node.download('http://localhost:8000/novel1.txt')rclpy.spin(node)rclpy.shutdown()这一版先验证节点可以启动,并且能够通过 HTTP 下载小说内容。
6.4 发布者第二版:发布 Topic
目标:把下载到的小说内容切分成行,然后定时发布到novel话题。
importrclpyfromrclpy.nodeimportNodeimportrequestsfromexample_interfaces.msgimportStringfromqueueimportQueueclassNovelPubNode(Node):def__init__(self,node_name):super().__init__(node_name)self.get_logger().info(f'{node_name},启动!')self.novels_queue_=Queue()self.nobel_pubblisher=self.create_publisher(String,'novel',10)self.create_timer(3.0,self.timer_callback)deftimer_callback(self):ifself.novels_queue_.qsize()>0:line=self.novels_queue_.get()msg=String()msg.data=line self.nobel_pubblisher.publish(msg)self.get_logger().info(f'发布了:{msg.data}')self.novels_queue_.put(line)defdownload(self,url):response=requests.get(url)response.encoding='utf-8'forlineinresponse.text.splitlines():ifline.strip():self.novels_queue_.put(line)核心点:
create_publisher(String, 'novel', 10)创建发布者。String是消息类型,发布和订阅两端必须一致。'novel'是话题名,ROS2 中实际显示时通常会看到/novel。10是消息队列深度,接收方处理慢时可以缓存最近的消息。create_timer(3.0, self.timer_callback)每 3 秒触发一次发布逻辑。- 不要用
while True持续发布,否则可能卡住节点其他功能。
6.5 订阅者:接收文本并语音朗读
安装语音引擎:
sudoaptupdatesudoaptinstallespeak-ng-ypip3installespeakngnovel_sub_node.py:
importrclpyfromrclpy.nodeimportNodefromexample_interfaces.msgimportStringfromqueueimportQueueimportthreadingimporttimeimportespeakngclassNovelSubNode(Node):def__init__(self,node_name):super().__init__(node_name)self.get_logger().info(f'{node_name},启动!')self.novels_queue_=Queue()self.nobels_subscriber_=self.create_subscription(String,'novel',self.novel_callback,10)self.speech_thread_=threading.Thread(target=self.speake_thread)self.speech_thread_.start()defnovel_callback(self,msg):self.novels_queue_.put(msg.data)self.get_logger().info(f'收到新章节:{msg.data}')defspeake_thread(self):speaker=espeakng.Speaker()speaker.voice='zh'whilerclpy.ok():ifself.novels_queue_.qsize()>0:text=self.novels_queue_.get()self.get_logger().info(f'正在朗读:{text}')speaker.say(text)speaker.wait()else:time.sleep(1)defmain():rclpy.init()node=NovelSubNode('novel_sub')rclpy.spin(node)rclpy.shutdown()订阅者里有两个关键设计:
create_subscription(String, 'novel', self.novel_callback, 10)订阅发布者的同名话题。- 语音朗读放进独立线程,避免
speaker.wait()卡住 ROS2 主线程。
rclpy.ok()可以感知节点是否还在运行。当按下Ctrl+C后,后台线程能跟着退出。
6.6 注册 Topic 节点入口
修改setup.py:
'console_scripts':['novel_pub_node = demo_python_topic.novel_pub_node:main','novel_sub_node = demo_python_topic.novel_sub_node:main',],如果你还在同一个教学包里整理全部案例,可以把入口统一写成:
'console_scripts':['person_node = demo_python_pkg.person_node:main','writer_node = demo_python_pkg.writer_node:main','learn_thread = demo_python_pkg.learn_thread:main','novel_pub_node = demo_python_topic.novel_pub_node:main','novel_sub_node = demo_python_topic.novel_sub_node:main',],实际项目里,入口属于哪个包,就应写在那个包自己的setup.py中。
6.7 编译与联调
在工作空间根目录执行:
colcon buildsourceinstall/setup.bash终端 1 启动发布者:
ros2 run demo_python_topic novel_pub_node终端 2 启动订阅者:
sourceinstall/setup.bash ros2 run demo_python_topic novel_sub_node如果发布者和订阅者都正常运行,订阅者会收到发布者发来的小说文本,并调用语音引擎朗读。
6.8 Topic 调试命令
查看当前 ROS2 网络中的话题:
ros2 topic list直接查看/novel话题内容:
ros2 topicecho/novel查看/novel发布频率:
ros2 topic hz /novel查看发布者节点信息:
ros2nodeinfo /novel_pub排查顺序建议:
- 先用
ros2 topic list看有没有/novel。 - 再用
ros2 topic echo /novel看有没有数据。 - 如果 echo 有数据,发布者没大问题,重点查订阅者。
- 如果 echo 没数据,重点查发布者、下载逻辑、定时器和
source install/setup.bash。
七、核心知识总结
7.1 ROS2 节点生命周期
一个最小 Python 节点通常包含:
rclpy.init()node=Node('node_name')rclpy.spin(node)rclpy.shutdown()类形式节点通常写成:
classMyNode(Node):def__init__(self):super().__init__('my_node')7.2 工作空间运行规律
每次修改代码后,通常需要:
colcon buildsourceinstall/setup.bash新开终端后,也要重新执行source install/setup.bash。
7.3 Topic 发布订阅规律
发布者负责:
publisher=self.create_publisher(String,'topic_name',10)publisher.publish(msg)订阅者负责:
subscriber=self.create_subscription(String,'topic_name',callback,10)两端的消息类型和话题名必须一致。
7.4 为什么使用Queue
下载、发布、朗读的速度可能完全不同。Queue可以把这些环节解耦:
- 下载快了,先把数据放进队列。
- 发布按定时器慢慢取。
- 朗读更慢,也可以按自己的节奏消费。
这样各模块互不硬等,节点更稳定。
7.5 避坑清单
- 忘记
source install/setup.bash:ros2 run找不到包或入口。 - 没有
rclpy.spin(node):节点启动后立刻结束。 - 没有
super().__init__(node_name):类无法真正注册为 ROS2 节点。 - 发布和订阅话题名不一致:订阅者收不到消息。
- 发布和订阅消息类型不一致:通信失败或行为异常。
- 在回调里执行耗时任务:节点卡死、消息堆积。
- 中文 HTTP 文本没设置 UTF-8:下载内容乱码。
八、学习路线建议
建议按以下顺序练习:
- 只运行第一个
py_node,确认 ROS2 基础环境可用。 - 创建
demo_python_pkg,练习person_node和writer_node。 - 用
learn_thread.py单独理解线程和回调。 - 创建
demo_python_topic,先写发布者,再写订阅者。 - 用
ros2 topic echo /novel验证数据流。 - 最后接入
espeakng,完成自动朗读系统。
学完这条线后,你已经掌握了 ROS2 Python 入门最重要的几块拼图:节点、包、构建、入口、面向对象、线程、队列、发布者、订阅者和调试命令。
