ROS2基础篇
万字入门ROS2基础篇!
一、工作空间
1.1 ROS2 的典型目录结构
ros2/
├── build/ # 编译中间文件(colcon自动生成)
├── install/ # 安装后的文件(可运行的节点等)
├── log/ # 构建日志
└── src/ # 你的源代码包都放在这里
- src:代码空间。未来编写的代码、脚本,都需要人为的放置到这里;
- build:编译空间。保存编译过程中产生的中间文件;
- install:安装空间。放置编译得到的可执行文件和脚本;
- log:日志空间。编译和运行过程中,保存各种警告、错误、信息等日志。
1.2 安装项目依赖
创建工作空间(ros2)和代码空间(src),然后在代码空间下放置一个项目。不管是什么项目,一定会需要各种依赖。我们可以手动一个一个安装,也可以使用 rosdep 工具自动安装。但是国内使用 rosdep 会超时。因此国内大佬“鱼香ROS”基于 rosdep 源码制作了 rosdepc,专门服务国内ROS用户。本文之后,世上再无rosdep更新失败问题!https://zhuanlan.zhihu.com/p/398754989 使用方法如下:
$ cd ros2/src
$ sudo pip3 install rosdepc
$ sudo rosdepc init
$ rosdepc update
$ cd ..
$ rosdepc install -i --from-path src --rosdistro jazzy -y # jazzy为ROS2的版本
1.3 生成工作空间
安装依赖后,就可以编译了。
$ cd ros2
$ colcon build
1.4 设置环境变量
编译成功后,为了让系统能够找到我们的功能包和可执行文件,还需要设置环境变量。
$ source install/local_setup.sh # 仅在当前终端生效
为了不用每次打开终端都输入这条指令,可以在 /home 文件夹下找到隐藏文件(ctrl + h).bashrc。在文件末尾添加刚才环境变量地址。
二、功能包
如果把 工作区(workspace) 想成一个 大工地,而 功能包(package) 则是里面的 一个个工棚/模块。比如:
ros2_ws/ ← 工作区
├── src/
│ ├── lidar_driver/ ← 包1:激光雷达驱动
│ ├── nav_system/ ← 包2:导航系统
│ └── my_robot/ ← 包3:机器人本体代码
├── build/ ← 编译缓存
├── install/ ← 可执行文件/库等
└── log/ ← 编译日志
我们把不同功能的代码划分到不同的功能包中,尽量降低他们之间的耦合关系,当需要在ROS社区中分享给别人的时候,只需要说明这个功能包该如何使用,别人很快就可以用起来了。所以功能包的机制,是提高ROS中软件复用率的重要方法之一。
2.1 创建功能包
$ ros2 pkg create --build-type <build-type> <package_name>
- build-type:表示新创建的功能包是C++还是Python的,如果使用C++或者C,那这里就跟ament_cmake,如果使用Python,就跟ament_python;
- package_name:新建功能包的名字。
比如:
$ ros2 pkg create --build-type ament_python pkg_python
2.2 编译功能包
同理,功能包的代码也需要在工作空间编译后才能运行。编译后,可以用如下命令看是否编译成功:
$ ros2 pkg list | grep [包名]
三、节点
上面我们提到,功能包像是工地的工棚,那么节点就相当于工棚里的不同职责的工人。
举个简单的例子:
import rclpy
from rclpy.node import Node
class MyNode(Node):
def __init__(self):
super().__init__('my_node')
self.get_logger().info("节点启动成功!")
def main(args=None):
rclpy.init(args=args)
node = MyNode()
rclpy.spin(node) # 持续运行,等待事件
node.destroy_node() # 销毁节点对象
rclpy.shutdown() # 关闭ROS2 Python接口
其中 rclpy 为节点的 python 依赖,如果是 C++,则使用 rclcpp 。其中导入的 Node 类是一个基类,可以添加 Topic、Service、Action 与其他节点通信。
之后所有的程序在编写完代码后,都要在 setup.py 中注册节点信息,即在 console_scripts 中添加节点:
from setuptools import setup
package_name = 'pkg_name'
setup(
name=package_name,
version='0.0.0',
packages=[package_name],
install_requires=['setuptools'],
zip_safe=True,
maintainer='your_name',
maintainer_email='your_email@example.com',
description='Simple node example',
license='Apache License 2.0',
entry_points={
'console_scripts': [
'my_node = pkg_name.my_node:main',
],
},
)
最后,使用如下命令运行:
$ ros2 run [包名] [节点名] # 包名:pkg_name; 节点名:my_node
上面的示例代码的节点名为 'my_node' 。运行以后,可以查看当前所有运行的节点:
$ ros2 node list # 查看节点列表
$ ros2 node info <node_name> # 查看节点信息
四、话题(Topic)
如果节点想要发送/接收数据(发布/订阅),那么就需要话题(Topic)。之前说到,节点 Node 类似于工棚里的工人,那么话题就相当于工人的对讲机,因此节点之间的 topic 通信是单向传播的。
4.1 发布者
编写一个名为 'talker' 的发布者节点,话题为 'chatter'。
class TalkerNode(Node):
def __init__(self):
super().__init__('talker')
# 创建发布者对象(消息类型、话题名、队列长度)
self.publisher_ = self.create_publisher(String, 'chatter', 10)
# 创建一个定时器(单位为秒的周期,定时执行的回调函数)
self.timer = self.create_timer(1.0, self.timer_callback)
self.count = 0
def timer_callback(self):
msg = String()
msg.data = f"Hello ROS2: {self.count}"
self.publisher_.publish(msg)
self.get_logger().info(f"Published: {msg.data}")
self.count += 1
4.2 订阅者
订阅者节点通过话题名 'chatter' 获取发布者节点的消息(msg)。
class ListenerNode(Node):
def __init__(self):
super().__init__('listener')
# 创建订阅者对象(消息类型、话题名、订阅者回调函数、队列长度)
self.subscription = self.create_subscription(
String,
'chatter',
self.listener_callback,
10
)
def listener_callback(self, msg):
self.get_logger().info(f"Received: {msg.data}")
编写好代码后,要在 setup.py 中的 console_scripts 添加这两行节点信息:
'talker = py_pubsub.talker_node:main',
'listener = py_pubsub.listener_node:main',
注意: 节点名为talker,文件名为talker_node.py。
$ ros2 run py_pubsub talker # 正确
$ ros2 run py_pubsub talker_node # 错误
五、服务(Service)
如果节点需要请求服务,然后等待结果(客户端/服务端),那么就需要服务(Service)。 topic 类似于对讲机,那么服务则类似于电话,你问我答,一种双向通信。
5.1 服务器(Server)
创建一个 'add_two_ints_server' 的服务器节点,一个 'add_two_ints' 的服务。AddTwoInts 来自 ROS2 的内置的一个服务接口包(example_interfaces),专门用来做加法演示或测试。
from example_interfaces.srv import AddTwoInts
class AddTwoIntsServer(Node):
def __init__(self):
super().__init__('add_two_ints_server')
# 创建服务器对象(接口类型、服务名、服务器回调函数)
self.srv = self.create_service(AddTwoInts, 'add_two_ints', self.add_callback)
self.get_logger().info('加法服务已启动')
# 回调函数名、参数名可以任取,但参数的个数、含义不变
def add_callback(self, request, response):
self.get_logger().info(f'收到请求: {request.a} + {request.b}')
response.sum = request.a + request.b
return response
def main(args=None):
rclpy.init(args=args)
node = AddTwoIntsServer()
rclpy.spin(node)
rclpy.shutdown()
5.2 客户端(Client)
from example_interfaces.srv import AddTwoInts
class AddTwoIntsClient(Node):
def __init__(self):
super().__init__('add_two_ints_client')
# 创建服务客户端对象(服务接口类型,服务名)
self.client = self.create_client(AddTwoInts, 'add_two_ints')
while not self.client.wait_for_service(timeout_sec=1.0):
self.get_logger().info('等待服务端上线...')
self.request = AddTwoInts.Request()
self.request.a = 7
self.request.b = 5
# 异步方式发送服务请求
self.future = self.client.call_async(self.request)
def main(args=None):
rclpy.init(args=args)
node = AddTwoIntsClient()
while rclpy.ok():
rclpy.spin_once(node)
if node.future.done():
try:
response = node.future.result()
node.get_logger().info(f'结果是: {response.sum}')
except Exception as e:
node.get_logger().error(f'服务调用失败: {e}')
break
node.destroy_node()
rclpy.shutdown()
运行后,可以通过如下命令查看服务:
$ ros2 service list # 查看服务列表
$ ros2 service type <service_name> # 查看服务数据类型
$ ros2 service call <service_name> <service_type> <service_data> # 发送服务请求
六、动作(Action)
Service 适合“立刻完成的操作”,而动作(Action)适合“耗时的、可取消的过程”。 比如:获取当前温度、启动一个动作,就可以用 Service,但是移动机械臂、走一段路径、上传一个文件必须用 Action。
从下图可以看出,Action 是基于 Service 和 Topic 来完成的。
- Goal Service:相当于 Client。需要客户端发出目标,当然也可以取消目标。
- Result Service:相当于 Server。服务器端拿到客户端发送的目标,分配 ID。如果客户端取消了发送,服务器端也应该返回是否取消成功。
- Feedback Topic: 一种由服务器到客户端的单向 Topic。包含当前所有目标任务的状态(比如:接受中、执行中、已完成)。
6.1 结果(Result)
首先,服务器需要创建动作服务器 ActionServer ,动作服务器需要传入一个回调函数 execute_callback 来处理客户端发来的目标动作。MoveCircle 是一个自定义的动作接口,ROS2 根据自定义的动作接口(.action 文件)会编译生成动作需要的 Goal() 、Feedback() 和 Result() 相关方法。其中的 Result() 方法就是这里谈的结果。
from rclpy.action import ActionServer # ROS2 动作服务器类
from learning_interface.action import MoveCircle # 自定义的圆周运动接口
class MoveCircleActionServer(Node):
def __init__(self, name):
super().__init__(name)
# 创建动作服务器(接口类型、动作名、回调函数)
self._action_server = ActionServer(
self,
MoveCircle,
'move_circle',
self.execute_callback)
# 执行收到动作目标之后的处理函数
def execute_callback(self, goal_handle):
# 创建一个动作反馈信息的消息
feedback_msg = MoveCircle.Feedback()
# 从0到360度,执行圆周运动,并周期反馈信息
for i in range(0, 360, 30):
feedback_msg.state = i # 创建反馈信息,表示当前执行到的角度
self.get_logger().info('Publishing feedback: %d' % feedback_msg.state)
goal_handle.publish_feedback(feedback_msg) # 发布反馈信息
time.sleep(0.5)
# 动作执行成功
goal_handle.succeed()
# 创建结果消息
result = MoveCircle.Result()
result.finish = True
return result
Result() 方法中的 finish 属性从何而来?来自于刚才讲的自定义的动作接口(.action 文件),这里的 finish 也可以是 success、ok等。
回调函数(execute_callback)的第二个参数 goal_handle 是一个 ROS2 的 ServerGoalHandle 类,查看其源码可以看到 publish_feedback() 和 success() 等函数方法的具体实现。
6.2 反馈(Feedback)
动作服务器中的这行代码即为反馈信。同样它的 state 属性也是 .action 文件里定义好的。
# 创建一个动作反馈信息的消息
feedback_msg = MoveCircle.Feedback()
...
self.get_logger().info('Publishing feedback: %d' % feedback_msg.state)
6.3 目标(Goal)
先创建一个动作客户端 ActionClient ,并和动作服务器一样,传入一个动作接口。接下来需要定义四个函数,分别是:
- def send_goal(self, seconds): 等待服务端准备好,构造一个
Goal
消息,最后以异步的方式发送 action 请求(并附带下面一个回调函数)。必须实现 - def goal_response_callback(self, future): 处理服务端是否接受目标的响应,再附带下一个回调函数。必须实现
- def get_result_callback(self, future): 处理任务最终的结果返回。必须实现
- def feedback_callback(self, feedback_msg): 实时处理服务端发来的反馈(比如“还剩几秒”)。非必须实现
必须实现的函数,函数名可以不同,但功能结构是必须要有的。
其中 Goal() 函数的属性 enable 是在 .action 文件里预定义的,代表动作开始。
from rclpy.action import ActionClient # ROS2 动作客户端类
from learning_interface.action import MoveCircle # 自定义的圆周运动接口
class MoveCircleActionClient(Node):
def __init__(self, name):
super().__init__(name)
# 创建动作客户端(接口类型、动作名)
self._action_client = ActionClient(self, MoveCircle, 'move_circle')
# 创建一个发送动作目标的函数
def send_goal(self, enable):
goal_msg = MoveCircle.Goal() # 创建一个动作目标的消息
goal_msg.enable = enable
self._action_client.wait_for_server() # 等待动作的服务器端启动
# 异步方式发送动作的目标
self._send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)
# 设置一个服务器收到目标之后反馈时的回调函数
self._send_goal_future.add_done_callback(self.goal_response_callback)
# 创建一个服务器收到目标之后反馈时的回调函数
def goal_response_callback(self, future):
goal_handle = future.result() # 接收动作的结果
if not goal_handle.accepted: # 如果动作被拒绝执行
self.get_logger().info('Goal rejected :(')
return
self.get_logger().info('Goal accepted :)') # 动作被顺利执行
# 异步获取动作最终执行的结果反馈
self._get_result_future = goal_handle.get_result_async()
self._get_result_future.add_done_callback(self.get_result_callback) # 设置一个收到最终结果的回调函数
# 创建一个收到最终结果的回调函数
def get_result_callback(self, future):
result = future.result().result
self.get_logger().info('Result: {%d}' % result.finish)
# 创建处理周期反馈消息的回调函数
def feedback_callback(self, feedback_msg):
feedback = feedback_msg.feedback # 读取反馈的数据
self.get_logger().info('Received feedback: {%d}' % feedback.state)
上述回调函数(xxx_callback)中的第二个参数涉及的 future.result() 即为 ROS2 的 ClientGoalHandle 类。
虽然 future.result() 为 ROS2 的 ClientGoalHandle 类,但是future.result().result 为自定义的 result消息体,也就是等价于 MoveCircle.result() 。
七、通信接口
ROS2 中,“接口”有非常具体的含义,包括三种类型:
接口类型 | 用在哪 | 文件格式 | 举例 |
---|---|---|---|
消息(Message) | Topic | .msg |
std_msgs/String.msg |
服务(Service) | Service | .srv |
example_interfaces/AddTwoInts.srv |
动作(Action) | Action | .action |
example_interfaces/MoveCircle.action |
上述的三种文件格式是告诉你,我这个 Topic / Service / Action 需要啥格式的数据,你必须按这个格式来跟我通信。而具体的格式如下:
上述的接口(.msg
/ .srv
/ .action
)是纯文本格式的“接口描述文件”,你写完之后,系统还不能直接用。至少要通过 colcon build
进行编译生成对应的 Python 或 C++ 文件,这些文件才能在你的节点代码中导入和使用。具体步骤如下:
7.1 写接口文件
my_package/
├── srv/
│ └── AddTwoInts.srv
├── msg/
│ └── MyMsg.msg
├── action/
│ └── Countdown.action
7.2 声明你的接口文件类型(在 CMakeLists.txt
)
这一步就是关键,它告诉编译器:“喂,这些接口文件要转成代码!”
find_package(rosidl_default_generators REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME}
"msg/MyMsg.msg"
"srv/AddTwoInts.srv"
"action/Countdown.action"
)
7.3 配置 package.xml
(声明依赖)
这一步的意思是:编译 & 运行时都要依赖“接口生成器”。
<buildtool_depend>rosidl_default_generators</buildtool_depend>
<build_depend>rosidl_default_runtime</build_depend>
<exec_depend>rosidl_default_runtime</exec_depend>
7.4 执行 colcon build
colcon build --packages-select my_package
这一步,ROS 2 会自动:
读取你的
.msg
/.srv
/.action
调用内部的
rosidl
工具链自动生成对应语言(Python 或 C++)的接口类(包括序列化、类型定义、服务调用封装等)
放到
build/
、install/
、log/
目录中
当编译结束后,就会在 install/ 目录下生成这几个文件:
install/my_package/lib/python3.10/site-packages/my_package/srv/AddTwoInts.py
install/my_package/lib/python3.10/site-packages/my_package/msg/MyMsg.py
install/my_package/lib/python3.10/site-packages/my_package/action/Countdown.py
例如,AddTwoInts 就会按照 .srv 文件定义的那样生成三个类(真实的 AddTwoInts.py 会比这个复杂的多)。
class AddTwoInts_Request:
a: int
b: int
class AddTwoInts_Response:
sum: int
class AddTwoInts:
Request = AddTwoInts_Request
Response = AddTwoInts_Response
7.5 使用
from my_package.srv import AddTwoInts
req = AddTwoInts.Request()
req.a = 3
req.b = 5
八、参数
ROS 2 中的“参数”是非常实用的功能,能让你的节点在运行时更加灵活、可配置,不需要重新改代码就能修改行为。
8.1 创建参数
from rclpy.node import Node
class MyNode(Node):
def __init__(self):
super().__init__('my_node')
# 声明参数,带默认值
self.declare_parameter('speed', 0.5)
# 获取参数值
self.speed = self.get_parameter('speed').get_parameter_value().double_value
self.get_logger().info(f"当前速度参数为:{self.speed}")
8.2 设置参数
有以下几种常见的设置参数方法:
- 在代码运行时设置具体参数:
ros2 run my_package my_node_exe --ros-args -p speed:=1.2
- 在 YAML 文件中批量配置:
my_node:
ros__parameters:
speed: 1.5
mode: "auto"
ros2 run my_package my_node_exe --ros-args --params-file config.yaml
- 动态修改
ros2 param list # 查看当前参数列表
ros2 param get /my_node speed # 查看某个参数
ros2 param set /my_node speed 2.0 # 设置某个参数为具体值
九、总结
以上就是从工作区与包、节点概念,到 Topic/Service/Action 通信、接口生成、异步回调,以及运行时参数管理的全流程。
更多推荐
所有评论(0)