当前位置: 首页 > news >正文

鱼香ros学习第三章话题

本节内容来自古月居ros2 21讲 个人学习笔记

节点实现了机器人各种各样的功能,但这些功能并不是独立的,之间会有千丝万缕的联系,其中最重要的一种联系方式就是话题,它是节点间传递数据的桥梁

通信模型

以两个机器人节点为例。A节点的功能是驱动相机这个硬件设备,获取得到相机拍摄的图像信息,B节点的功能是视频监控,将相机拍摄到的图像实时显示给用户查看。

此时从节点A到节点B传递图像数据的方式,在ROS中,我们就称之为话题,它作为一个桥梁,实现了节点之间某一个方向上的数据传输。

发布/订阅模型

从话题本身的实现角度来看,使用了基于DDS的发布/订阅模型,什么叫发布和订阅呢?

话题数据传输的特性是从一个节点到另外一个节点,发送数据的对象称之为发布者,接收数据的对象称之为订阅者,每一个话题都需要有一个名字,传输的数据也需要有固定的数据类型。

打一个比方,大家平时应该也会看微信公众号,比如有一个公众号,它的名字叫做“古月居”,这个古月居就是话题名称,公众号的发布者是古月居的小编,他会把组织好的机器人知识排版成要求格式的公众号文章,发布出去,这个文章格式,就是话题的数据类型。如果大家对这个话题感兴趣,就可以订阅“古月居”,成为订阅者之后自然就可以收到古月居的公众号文章,没有订阅的话,也就无法收到。

类似这样的发布/订阅模型在生活中随处可见,比如订阅报纸、订阅杂志等等。

多对多通信

大家再仔细想下这些可以订阅的东西,是不是并不是唯一的,我们每个人可以订阅很多公众号、报纸、杂志,这些公众号、报纸、杂志也可以被很多人订阅,没错,ROS里的话题也是一样,发布者和订阅者的数量并不是唯一的,可以称之为是多对多的通信模型。

因为话题是多对多的模型,发布控制指令的摇杆可以有一个,也可以有2个、3个,订阅控制指令的机器人可以有1个,也可以有2个、3个,大家可以想象一下这个画面,似乎还是挺魔性的,如果存在多个发送指令的节点,建议大家要注意区分优先级,不然机器人可能不知道该听谁的了。

异步通信

话题通信还有一个特性,那就是异步,这个词可能有同学是第一次听说?所谓异步,只要是指发布者发出数据后,并不知道订阅者什么时候可以收到,类似古月居公众号发布一篇文章,你什么时候阅读的,古月居根本不知道,报社发出一份报纸,你什么时候收到,报社也是不知道的。这就叫做异步。

异步的特性也让话题更适合用于一些周期发布的数据,比如传感器的数据,运动控制的指令等等,如果某些逻辑性较强的指令,比如修改某一个参数,用话题传输就不太合适了。

消息接口

最后,既然是数据传输,发布者和订阅者就得统一数据的描述格式,不能一个说英文,一个理解成了中文。在ROS中,话题通信数据的描述格式称之为消息,对应编程语言中数据结构的概念。比如这里的一个图像数据,就会包含图像的长宽像素值、每个像素的RGB等等,在ROS中都有标准定义。

消息是ROS中的一种接口定义方式,与编程语言无关,我们也可以通过.msg后缀的文件自行定义,有了这样的接口,各种节点就像积木块一样,通过各种各样的接口进行拼接,组成复杂的机器人系统。

在src目录下创建功能包

ros2 pkg create demo_cpp_topic --build-type ament_cmake --dependencies rclcpp geometry_msgs turtlesim --license Apache-2.0

模板

import rclpy from rclpy.node import Node class NovelPubNode(Node): def __init__(self, node_name): super().__init__(node_name) self.get_logger().info(f'{node_name},启动!') def main(): rclpy.init() node = NovelPubNode('novel_pub') rclpy.spin(node) rclpy.shutdown()

发布者代码解析

我们来看下发布者的实现方法。

程序实现

learning_topic/topic_helloworld_pub.py

""" @作者: 古月居(www.guyuehome.com) @说明: ROS2话题示例-发布“Hello World”话题 """ import rclpy # ROS2 Python接口库 from rclpy.node import Node # ROS2 节点类 from std_msgs.msg import String # 字符串消息类型 """ 创建一个发布者节点 """ class PublisherNode(Node): def __init__(self, name): super().__init__(name) # ROS2节点父类初始化 self.pub = self.create_publisher(String, "chatter", 10) # 创建发布者对象(消息类型、话题名、队列长度) self.timer = self.create_timer(0.5, self.timer_callback) # 创建一个定时器(单位为秒的周期,定时执行的回调函数) def timer_callback(self): # 创建定时器周期执行的回调函数 msg = String() # 创建一个String类型的消息对象 msg.data = 'Hello World' # 填充消息对象中的消息数据 self.pub.publish(msg) # 发布话题消息 self.get_logger().info('Publishing: "%s"' % msg.data) # 输出日志信息,提示已经完成话题发布 def main(args=None): # ROS2节点主入口main函数 rclpy.init(args=args) # ROS2 Python接口初始化 node = PublisherNode("topic_helloworld_pub") # 创建ROS2节点对象并进行初始化 rclpy.spin(node) # 循环等待ROS2退出 node.destroy_node() # 销毁节点对象 rclpy.shutdown() # 关闭ROS2 Python接口

完成代码的编写后需要设置功能包的编译选项,让系统知道Python程序的入口,打开功能包的setup.py文件,加入如下入口点的配置:

entry_points={ 'console_scripts': [ 'topic_helloworld_pub = learning_topic.topic_helloworld_pub:main', ], },

对以上程序进行分析,如果我们想要实现一个发布者,流程如下:

  • 编程接口初始化
  • 创建节点并初始化
  • 创建发布者对象
  • 创建并填充话题消息
  • 发布话题消息
  • 销毁节点并关闭接口

订阅者代码解析

我们再来看下订阅者的实现方法。

程序实现

learning_topic/topic_helloworld_sub.py

""" @作者: 古月居(www.guyuehome.com) @说明: ROS2话题示例-订阅“Hello World”话题消息 """ import rclpy # ROS2 Python接口库 from rclpy.node import Node # ROS2 节点类 from std_msgs.msg import String # ROS2标准定义的String消息 """ 创建一个订阅者节点 """ class SubscriberNode(Node): def __init__(self, name): super().__init__(name) # ROS2节点父类初始化 self.sub = self.create_subscription(String, "chatter", self.listener_callback, 10) # 创建订阅者对象(消息类型、话题名、订阅者回调函数、队列长度) def listener_callback(self, msg): # 创建回调函数,执行收到话题消息后对数据的处理 self.get_logger().info('I heard: "%s"' % msg.data) # 输出日志信息,提示订阅收到的话题消息 def main(args=None): # ROS2节点主入口main函数 rclpy.init(args=args) # ROS2 Python接口初始化 node = SubscriberNode("topic_helloworld_sub") # 创建ROS2节点对象并进行初始化 rclpy.spin(node) # 循环等待ROS2退出 node.destroy_node() # 销毁节点对象 rclpy.shutdown() # 关闭ROS2 Python接口

对以上程序进行分析,如果我们想要实现一个订阅者,流程如下:

  • 编程接口初始化
  • 创建节点并初始化
  • 创建订阅者对象
  • 回调函数处理话题数据
  • 销毁节点并关闭接口

发布小说

import rclpy from rclpy.node import Node import requests#请求库 from example_interfaces.msg import String#消息接口 from queue import Queue class NovelPubNode(Node): def __init__(self, node_name): super().__init__(node_name) self.novels_queue_ = Queue() # 创建队列,存放小说 # 创建话题发布者,发布小说 self.novel_publisher_ = self.create_publisher(String, 'novel', 10)# 创建发布者对象(消息类型、话题名、队列长度) self.timer_ = self.create_timer(5, self.timer_callback) # 创建定时器 def download_novel(self, url): response = requests.get(url) response.encoding = 'utf-8' self.get_logger().info(f'下载完成:{url}') for line in response.text.splitlines(): # 按行分割,放入队列 self.novels_queue_.put(line) def timer_callback(self): if self.novels_queue_.qsize() > 0: # 队列中有数据,取出发布一行 msg = String() # 实例化一个消息 msg.data = self.novels_queue_.get() # 对消息结构体进行赋值 self.novel_publisher_.publish(msg) # 发布消息 self.get_logger().info(f'发布了一行小说:{msg.data}') def main(): rclpy.init() node = NovelPubNode('novel_pub') node.download_novel('http://localhost:8000/novel1.txt') rclpy.spin(node) rclpy.shutdown()

订阅接受并朗读小说

import rclpy from rclpy.node import Node from example_interfaces.msg import String import threading from queue import Queue import time import espeakng class NovelSubNode(Node): def __init__(self, node_name): super().__init__(node_name) self.novels_queue_ = Queue()#队列 self.novel_subscriber_ = self.create_subscription(String, 'novel', self.novel_callback, 10)# 创建订阅者对象(消息类型、话题名、订阅者回调函数、队列长度) self.speech_thread_ = threading.Thread(target=self.speak_thread) self.speech_thread_.start()#启动线程 def novel_callback(self, msg):#回调函数 self.novels_queue_.put(msg.data)#小说放入队列 def speak_thread(self): speaker = espeakng.Speaker() speaker.voice = 'zh'#中文 while rclpy.ok():#检测当前ros上下文是否正常 if self.novels_queue_.qsize() > 0: text = self.novels_queue_.get() self.get_logger().info(f'正在朗读 {text}') speaker.say(text)#说 speaker.wait()#说完 else: time.sleep(1)#让当前线程休眠 def main(args=None): rclpy.init(args=args) node = NovelSubNode("novel_read") rclpy.spin(node) rclpy.shutdown()

案例二:机器视觉识别

在节点概念的讲解过程中,我们通过一个节点驱动了相机,并且实现了对红色物体的识别。功能虽然没问题,但是对于机器人开发来讲,并没有做到程序的模块化,更好的方式是将相机驱动和视觉识别做成两个节点,节点间的联系就是这个图像数据,通过话题周期传输即可。

一个发布图像,一个订阅图像。

发布者代码解析

learning_topic/topic_webcam_pub.py

#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ @作者: 古月居(www.guyuehome.com) @说明: ROS2话题示例-发布图像话题 """ import rclpy # ROS2 Python接口库 from rclpy.node import Node # ROS2 节点类 from sensor_msgs.msg import Image # 图像消息类型 from cv_bridge import CvBridge # ROS与OpenCV图像转换类 import cv2 # Opencv图像处理库 """ 创建一个发布者节点 """ class ImagePublisher(Node): def __init__(self, name): super().__init__(name) # ROS2节点父类初始化 self.publisher_ = self.create_publisher(Image, 'image_raw', 10) # 创建发布者对象(消息类型、话题名、队列长度) self.timer = self.create_timer(0.1, self.timer_callback) # 创建一个定时器(单位为秒的周期,定时执行的回调函数) self.cap = cv2.VideoCapture(0) # 创建一个视频采集对象,驱动相机采集图像(相机设备号) self.cv_bridge = CvBridge() # 创建一个图像转换对象,用于稍后将OpenCV的图像转换成ROS的图像消息 def timer_callback(self): ret, frame = self.cap.read() # 一帧一帧读取图像 if ret == True: # 如果图像读取成功 self.publisher_.publish( self.cv_bridge.cv2_to_imgmsg(frame, 'bgr8')) # 发布图像消息 self.get_logger().info('Publishing video frame') # 输出日志信息,提示已经完成图像话题发布 def main(args=None): # ROS2节点主入口main函数 rclpy.init(args=args) # ROS2 Python接口初始化 node = ImagePublisher("topic_webcam_pub") # 创建ROS2节点对象并进行初始化 rclpy.spin(node) # 循环等待ROS2退出 node.destroy_node() # 销毁节点对象 rclpy.shutdown() # 关闭ROS2 Python接口

订阅者代码解析

learning_topic/topic_webcam_sub.py

#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ @作者: 古月居(www.guyuehome.com) @说明: ROS2话题示例-订阅图像话题 """ import rclpy # ROS2 Python接口库 from rclpy.node import Node # ROS2 节点类 from sensor_msgs.msg import Image # 图像消息类型 from cv_bridge import CvBridge # ROS与OpenCV图像转换类 import cv2 # Opencv图像处理库 import numpy as np # Python数值计算库 lower_red = np.array([0, 90, 128]) # 红色的HSV阈值下限 upper_red = np.array([180, 255, 255]) # 红色的HSV阈值上限 """ 创建一个订阅者节点 """ class ImageSubscriber(Node): def __init__(self, name): super().__init__(name) # ROS2节点父类初始化 self.sub = self.create_subscription( Image, 'image_raw', self.listener_callback, 10) # 创建订阅者对象(消息类型、话题名、订阅者回调函数、队列长度) self.cv_bridge = CvBridge() # 创建一个图像转换对象,用于OpenCV图像与ROS的图像消息的互相转换 def object_detect(self, image): hsv_img = cv2.cvtColor(image, cv2.COLOR_BGR2HSV) # 图像从BGR颜色模型转换为HSV模型 mask_red = cv2.inRange(hsv_img, lower_red, upper_red) # 图像二值化 contours, hierarchy = cv2.findContours( mask_red, cv2.RETR_LIST, cv2.CHAIN_APPROX_NONE) # 图像中轮廓检测 for cnt in contours: # 去除一些轮廓面积太小的噪声 if cnt.shape[0] < 150: continue (x, y, w, h) = cv2.boundingRect(cnt) # 得到苹果所在轮廓的左上角xy像素坐标及轮廓范围的宽和高 cv2.drawContours(image, [cnt], -1, (0, 255, 0), 2)# 将苹果的轮廓勾勒出来 cv2.circle(image, (int(x+w/2), int(y+h/2)), 5, (0, 255, 0), -1) # 将苹果的图像中心点画出来 cv2.imshow("object", image) # 使用OpenCV显示处理后的图像效果 cv2.waitKey(10) def listener_callback(self, data): self.get_logger().info('Receiving video frame') # 输出日志信息,提示已进入回调函数 image = self.cv_bridge.imgmsg_to_cv2(data, 'bgr8') # 将ROS的图像消息转化成OpenCV图像 self.object_detect(image) # 苹果检测 def main(args=None): # ROS2节点主入口main函数 rclpy.init(args=args) # ROS2 Python接口初始化 node = ImageSubscriber("topic_webcam_sub") # 创建ROS2节点对象并进行初始化 rclpy.spin(node) # 循环等待ROS2退出 node.destroy_node() # 销毁节点对象 rclpy.shutdown() # 关闭ROS2 Python接口

发布速度控制海龟转圈圈

import rclpy from rclpy.node import Node import requests#请求库 from example_interfaces.msg import String#消息接口 from queue import Queue from geometry_msgs.msg import Twist class NovelPubNode(Node): def __init__(self, node_name): super().__init__(node_name) # 创建话题发布者,发布海龟命令 self.turtle_circle_publisher_ = self.create_publisher(Twist, '/turtle1/cmd_vel', 10)# 创建发布者对象(消息类型、话题名、队列长度) self.timer_ = self.create_timer(1, self.timer_callback) # 创建定时器 def timer_callback(self): twist=Twist()#组装数据 twist.linear.x=1.0 twist.angular.z=0.5 self.turtle_circle_publisher_.publish(twist) # 发布消息 def main(): rclpy.init() node = NovelPubNode('turtle_circle') rclpy.spin(node) rclpy.shutdown()

海龟控制,使一直海龟到达另一只海龟位置

import rclpy from rclpy.node import Node import requests#请求库 from turtlesim.msg import Pose#消息接口 from queue import Queue from geometry_msgs.msg import Twist import numpy as np import math class TurtlePubNode(Node): def __init__(self, node_name): super().__init__(node_name) # 创建话题发布者,发布海龟命令 self.turtle_control_publisher_ = self.create_publisher(Twist, '/turtle2/cmd_vel', 10)# 创建发布者对象(消息类型、话题名、队列长度) self.turtle_control_subscriber_ = self.create_subscription(Pose, '/turtle2/pose', self.pose_callback, 10)# 创建订阅者对象(消息类型、话题名、订阅者回调函数、队列长度) self.turtle_control2_subscriber_ = self.create_subscription(Pose, '/turtle1/pose', self.pose_callback2, 10)# 创建订阅者对象(消息类型、话题名、订阅者回调函数、队列长度) #目标位置 self.target_x_ = 5.0 #// 目标位置X,设置默认值1.0 self.target_y_ = 5.0 #// 目标位置Y,设置默认值1.0 self.k_ = 1.0 #// 比例系数,控制输出=误差*比例系数 self.max_speed_ = 3.0 #// 最大线速度,设置默认值3.0 def pose_callback2(self,turtle_pose2): self.target_x_ = turtle_pose2.x #// 目标位置X,设置默认值1.0 self.target_y_ = turtle_pose2.y #// 目标位置Y,设置默认值1.0 def pose_callback(self,turtle_pose): # 1. 记录当前位置 current_x = turtle_pose.x current_y = turtle_pose.y #self.get_logger().info(f'当前位置:(x={current_x},y={current_y})') # 2. 计算距离目标的距离,与当前海龟朝向的角度差 distance = math.sqrt((self.target_x_ - current_x) ** 2 + (self.target_y_ - current_y) ** 2) angle = math.atan2(self.target_y_ - current_y, self.target_x_ - current_x) - turtle_pose.theta # 3. 控制策略:距离大于0.1继续运动,角度差大于0.2则原地旋转,否则直行 twist_msg = Twist() if distance > 0.1: if abs(angle) > 0.2: twist_msg.angular.z = abs(angle) else: # 通过比例控制器计算输出速度 twist_msg.linear.x = self.k_ * distance # 4. 限制最大值 if twist_msg.linear.x > self.max_speed_: twist_msg.linear.x = self.max_speed_ # 发布消息 self.turtle_control_publisher_.publish(twist_msg) # 发布消息 def main(): rclpy.init() node = TurtlePubNode('turtle_control') rclpy.spin(node) rclpy.shutdown()

自定义通信接口

ros2 pkg create status_interfaces --dependencies builtin_interfaces rosidl_default_genterators --license Apache-2.0
builtin_interfaces/Time stamp # 记录时间戳 string host_name # 系统名称 float32 cpu_percent # CPU使用率 float32 memory_percent # 内存使用率 float32 memory_total # 内存总量 float32 memory_available # 剩余有效内存 float64 net_sent # 网络发送数据总量 float64 net_recv # 网络接收数据总量

只能用C语言去建立通信接口 见视频《ROS 2机器人开发从入门到实践》3.4.2自定义通信接口_哔哩哔哩_bilibili

python发布该自定义话题

import rclpy from rclpy.node import Node from status_interfaces.msg import SystemStatus # 导入消息接口 import psutil import platform class SysStatusPub(Node): def __init__(self, node_name): super().__init__(node_name) self.status_publisher_ = self.create_publisher(SystemStatus, 'sys_status', 10)#创建发布者 self.timer = self.create_timer(1, self.timer_callback)#定时器 def timer_callback(self): cpu_percent = psutil.cpu_percent()#CPU使用率 memory_info = psutil.virtual_memory()#内存信息 net_io_counters = psutil.net_io_counters()#网络信息 msg = SystemStatus() msg.stamp = self.get_clock().now().to_msg()#当前时间 msg.host_name = platform.node() msg.cpu_percent = cpu_percent msg.memory_percent = memory_info.percent msg.memory_total = memory_info.total / 1024 / 1024 msg.memory_available = memory_info.available / 1024 / 1024 msg.net_sent = net_io_counters.bytes_sent / 1024 / 1024 msg.net_recv = net_io_counters.bytes_recv / 1024 / 1024 self.get_logger().info(f'发布:{str(msg)}') self.status_publisher_.publish(msg) def main(): rclpy.init() node = SysStatusPub('sys_status_pub') rclpy.spin(node) rclpy.shutdown()

用python接受该话题并进行可视化显示

import sys import threading import rclpy from rclpy.node import Node from PyQt5.QtWidgets import QApplication, QLabel, QVBoxLayout, QWidget from PyQt5.QtCore import Qt from status_interfaces.msg import SystemStatus class SysStatusDisplay(Node): def __init__(self): super().__init__('sys_status_display') self.subscription = self.create_subscription(SystemStatus,'sys_status',self.listener_callback,10)#订阅者 self.label_text = QLabel("等待系统状态消息...") def listener_callback(self, msg): show_str = ( f"===========系统状态可视化显示工具============\n" f"数 据 时 间:\t{msg.stamp.sec} s\n" f"用 户 名:\t{msg.host_name}\t\n" f"CPU使用率:\t{msg.cpu_percent} %\n" f"内存使用率:\t{msg.memory_percent} %\n" f"内存总大小:\t{msg.memory_total} MB\n" f"剩余有效内存:\t{msg.memory_available} MB\n" f"网络发送量:\t{msg.net_sent} MB\n" f"网络接收量:\t{msg.net_recv} MB\n" f"==========================================" ) self.label_text.setText(show_str) self.label_text.show() def main(): app = QApplication(sys.argv) rclpy.init() node = SysStatusDisplay('display') def spin(): rclpy.spin(node) spin_thread = threading.Thread(target=spin) spin_thread.daemon = True # 确保主程序退出时线程也会退出 spin_thread.start() app.exec_()
http://www.jsqmd.com/news/654883/

相关文章:

  • Latex排版+实验设计:我是如何在家‘纸上谈兵’完成TCSVT顶会论文初稿的
  • RVC WebUI界面详解:每个按钮功能说明,小白秒懂操作
  • 知名企业家诉讼离婚请律师委托费多少,有哪些上海本地的律师推荐 - 工业设备
  • 2026年靠谱的图像质量测试设备型号推荐,摄像头测试设备多少钱揭秘 - mypinpai
  • 引用vs指针
  • 从Prompt注入到训练数据投毒:生成式AI全链路隐私攻击图谱(2024最新ATTCK for AI v2.1)
  • R| 纵向数据可视化:用增强版云雨图(Raincloudplots)揭示时间序列变化
  • 802.11AX资源调度探秘:NDP反馈报告(NFR)机制详解
  • 2026年4月佛山顺德五金模具定制供应商深度对标指南——金属制品与五金配件采购避坑全攻略 - 精选优质企业推荐官
  • Windows虚拟机CPU跑满?别急着重启,用perf和火焰图揪出QEMU-KVM里的“电老虎”
  • 2026移民美国中介排名及行业服务参考 - 品牌排行榜
  • 甘肃万通技工学校教学方法大揭秘,专业是否靠谱一看便知 - 工业设备
  • 抖音无水印批量下载实战指南:3分钟搞定高效内容管理
  • 双硬盘用户必看!DISM++安装Win10 22H2时如何避免误删数据盘(含DiskGenius分区详解)
  • 3步掌握StreamFX:OBS视频特效插件的终极指南
  • 重磅合作|大宇云与胡润独角兽E签宝达成代理合作,共启数字化服务新征程 - 速递信息
  • Qt_笔记
  • 终极Windows更新修复方案:Reset Windows Update Tool完整指南
  • 如何彻底掌控你的数字记忆:WeChatMsg让你的聊天数据真正属于你
  • 图论——岛屿数量
  • 牛客Top200---合并区间 (Java实战:从图解到代码的完整通关)
  • 别再到处找了!2024最新银河麒麟V10全版本(飞腾/龙芯/兆芯)官方下载与安装保姆级教程
  • 2026兰州好吃的涮羊肉指南:滩羊肉店推荐-清真羊胜记铜锅涮肉・爆肚 (天水路店),好吃不踩雷 - 栗子测评
  • 打通业财壁垒,破解“两张皮”难题——融智天费用控制系统业财一体化体验 - 业财科技
  • 可扩散模型(Diffusion Models)详解:从原理到应用
  • Qt桌面应用现代化改造:用AdvancedDockingSystem打造可拖拽停靠的‘IDE级’主界面(搭配自制Ribbon菜单)
  • 2025年500米分辨率的地形粗糙度栅格数据(全球/全国)
  • django-push-notifications错误处理与调试:解决常见推送问题
  • 农历计算的技术挑战与lunar-javascript的解决方案:构建高效的传统历法系统
  • 如何理解Tomcat、Servlet、Catanalina的关系