一、工作空间

1.1 ROS2 的典型目录结构

ros2/
├── build/       # 编译中间文件(colcon自动生成)
├── install/     # 安装后的文件(可运行的节点等)
├── log/         # 构建日志
└── src/         # 你的源代码包都放在这里
  • src:代码空间。未来编写的代码、脚本,都需要人为的放置到这里;
  • build:编译空间。保存编译过程中产生的中间文件;
  • install:安装空间。放置编译得到的可执行文件和脚本;
  • log:日志空间。编译和运行过程中,保存各种警告、错误、信息等日志。

1.2 安装项目依赖

        创建工作空间(ros2)和代码空间(src),然后在代码空间下放置一个项目。不管是什么项目,一定会需要各种依赖。我们可以手动一个一个安装,也可以使用 rosdep 工具自动安装。但是国内使用 rosdep 会超时。因此国内大佬“鱼香ROS”基于 rosdep 源码制作了 rosdepc,专门服务国内ROS用户。本文之后,世上再无rosdep更新失败问题!https://zhuanlan.zhihu.com/p/398754989        使用方法如下:

$ cd ros2/src
$ sudo pip3 install rosdepc
$ sudo rosdepc init
$ rosdepc update

$ cd ..
$ rosdepc install -i --from-path src --rosdistro jazzy -y  # jazzy为ROS2的版本

1.3 生成工作空间

        安装依赖后,就可以编译了。

$ cd ros2  
$ colcon build  

1.4 设置环境变量

        编译成功后,为了让系统能够找到我们的功能包和可执行文件,还需要设置环境变量。

$ source install/local_setup.sh # 仅在当前终端生效

        为了不用每次打开终端都输入这条指令,可以在 /home 文件夹下找到隐藏文件(ctrl + h).bashrc。在文件末尾添加刚才环境变量地址。

二、功能包 

        如果把 工作区(workspace) 想成一个 大工地,而 功能包(package) 则是里面的 一个个工棚/模块。比如:

ros2_ws/          ← 工作区
├── src/
│   ├── lidar_driver/       ← 包1:激光雷达驱动
│   ├── nav_system/         ← 包2:导航系统
│   └── my_robot/           ← 包3:机器人本体代码
├── build/                  ← 编译缓存
├── install/                ← 可执行文件/库等
└── log/                    ← 编译日志

        我们把不同功能的代码划分到不同的功能包中,尽量降低他们之间的耦合关系,当需要在ROS社区中分享给别人的时候,只需要说明这个功能包该如何使用,别人很快就可以用起来了。所以功能包的机制,是提高ROS中软件复用率的重要方法之一。

2.1 创建功能包

$ ros2 pkg create --build-type <build-type> <package_name>
  • build-type:表示新创建的功能包是C++还是Python的,如果使用C++或者C,那这里就跟ament_cmake,如果使用Python,就跟ament_python;
  • package_name:新建功能包的名字。

        比如:

$ ros2 pkg create --build-type ament_python pkg_python

2.2 编译功能包

        同理,功能包的代码也需要在工作空间编译后才能运行。编译后,可以用如下命令看是否编译成功:

$ ros2 pkg list | grep [包名]

三、节点

        上面我们提到,功能包像是工地的工棚,那么节点就相当于工棚里的不同职责的工人

        举个简单的例子:

import rclpy
from rclpy.node import Node

class MyNode(Node):
    def __init__(self):
        super().__init__('my_node')
        self.get_logger().info("节点启动成功!")

def main(args=None):
    rclpy.init(args=args)
    node = MyNode()
    rclpy.spin(node)   # 持续运行,等待事件
    node.destroy_node()  # 销毁节点对象
    rclpy.shutdown()  # 关闭ROS2 Python接口

        其中 rclpy 为节点的 python 依赖,如果是 C++,则使用 rclcpp 。其中导入的 Node 类是一个基类,可以添加 Topic、Service、Action 与其他节点通信。

        之后所有的程序在编写完代码后,都要在 setup.py 中注册节点信息,即在 console_scripts 中添加节点:

from setuptools import setup

package_name = 'pkg_name'

setup(
    name=package_name,
    version='0.0.0',
    packages=[package_name],
    install_requires=['setuptools'],
    zip_safe=True,
    maintainer='your_name',
    maintainer_email='your_email@example.com',
    description='Simple node example',
    license='Apache License 2.0',
    entry_points={
        'console_scripts': [
            'my_node = pkg_name.my_node:main',
        ],
    },
)

        最后,使用如下命令运行:

$ ros2 run [包名] [节点名]  # 包名:pkg_name; 节点名:my_node

        上面的示例代码的节点名'my_node' 。运行以后,可以查看当前所有运行的节点:

$ ros2 node list               # 查看节点列表
$ ros2 node info <node_name>   # 查看节点信息

四、话题(Topic)

        如果节点想要发送/接收数据(发布/订阅),那么就需要话题(Topic)。之前说到,节点 Node 类似于工棚里的工人,那么话题就相当于工人的对讲机,因此节点之间的 topic 通信是单向传播的。

4.1 发布者

        编写一个名为 'talker' 的发布者节点,话题为 'chatter'

class TalkerNode(Node):
    def __init__(self):
        super().__init__('talker')

        # 创建发布者对象(消息类型、话题名、队列长度)
        self.publisher_ = self.create_publisher(String, 'chatter', 10)  

        # 创建一个定时器(单位为秒的周期,定时执行的回调函数)
        self.timer = self.create_timer(1.0, self.timer_callback)  
        self.count = 0

    def timer_callback(self):
        msg = String()
        msg.data = f"Hello ROS2: {self.count}"
        self.publisher_.publish(msg)
        self.get_logger().info(f"Published: {msg.data}")
        self.count += 1

4.2 订阅者

        订阅者节点通过话题名 'chatter' 获取发布者节点的消息(msg)。

class ListenerNode(Node):
    def __init__(self):
        super().__init__('listener')

        # 创建订阅者对象(消息类型、话题名、订阅者回调函数、队列长度)
        self.subscription = self.create_subscription(
            String,
            'chatter',
            self.listener_callback,
            10
        )

    def listener_callback(self, msg):
        self.get_logger().info(f"Received: {msg.data}")

        编写好代码后,要在 setup.py 中的 console_scripts 添加这两行节点信息:

'talker = py_pubsub.talker_node:main',
'listener = py_pubsub.listener_node:main',

注意: 节点名为talker,文件名为talker_node.py。

$ ros2 run py_pubsub talker       # 正确
$ ros2 run py_pubsub talker_node  # 错误

五、服务(Service)

        如果节点需要请求服务,然后等待结果(客户端/服务端),那么就需要服务(Service)。 topic 类似于对讲机,那么服务则类似于电话,你问我答,一种双向通信

5.1 服务器(Server)

        创建一个 'add_two_ints_server' 的服务器节点,一个 'add_two_ints' 的服务。AddTwoInts 来自 ROS2 的内置的一个服务接口包(example_interfaces),专门用来做加法演示或测试。

from example_interfaces.srv import AddTwoInts

class AddTwoIntsServer(Node):
    def __init__(self):
        super().__init__('add_two_ints_server')

        # 创建服务器对象(接口类型、服务名、服务器回调函数)
        self.srv = self.create_service(AddTwoInts, 'add_two_ints', self.add_callback)

        self.get_logger().info('加法服务已启动')

    # 回调函数名、参数名可以任取,但参数的个数、含义不变
    def add_callback(self, request, response):
        self.get_logger().info(f'收到请求: {request.a} + {request.b}')
        response.sum = request.a + request.b
        return response

def main(args=None):
    rclpy.init(args=args)
    node = AddTwoIntsServer()
    rclpy.spin(node)
    rclpy.shutdown()

5.2 客户端(Client)

from example_interfaces.srv import AddTwoInts

class AddTwoIntsClient(Node):
    def __init__(self):
        super().__init__('add_two_ints_client')

        # 创建服务客户端对象(服务接口类型,服务名)
        self.client = self.create_client(AddTwoInts, 'add_two_ints')

        while not self.client.wait_for_service(timeout_sec=1.0):
            self.get_logger().info('等待服务端上线...')

        self.request = AddTwoInts.Request()
        self.request.a = 7
        self.request.b = 5
        
        # 异步方式发送服务请求
        self.future = self.client.call_async(self.request)

def main(args=None):
    rclpy.init(args=args)
    node = AddTwoIntsClient()

    while rclpy.ok():
        rclpy.spin_once(node)
        if node.future.done():
            try:
                response = node.future.result()
                node.get_logger().info(f'结果是: {response.sum}')
            except Exception as e:
                node.get_logger().error(f'服务调用失败: {e}')
            break

    node.destroy_node()
    rclpy.shutdown()

        运行后,可以通过如下命令查看服务:

$ ros2 service list                  # 查看服务列表
$ ros2 service type <service_name>   # 查看服务数据类型
$ ros2 service call <service_name> <service_type> <service_data>   # 发送服务请求

六、动作(Action)

        Service 适合“立刻完成的操作”,而动作(Action)适合“耗时的、可取消的过程”。 比如:获取当前温度、启动一个动作,就可以用 Service,但是移动机械臂、走一段路径、上传一个文件必须用 Action。

        从下图可以看出,Action 是基于 Service 和 Topic 来完成的。

  • Goal Service:相当于 Client。需要客户端发出目标,当然也可以取消目标。
  • Result Service:相当于 Server。服务器端拿到客户端发送的目标,分配 ID。如果客户端取消了发送,服务器端也应该返回是否取消成功。
  • Feedback Topic: 一种由服务器到客户端的单向 Topic。包含当前所有目标任务的状态(比如:接受中、执行中、已完成)。

6.1 结果(Result)

        首先,服务器需要创建动作服务器 ActionServer ,动作服务器需要传入一个回调函数 execute_callback 来处理客户端发来的目标动作。MoveCircle 是一个自定义的动作接口,ROS2 根据自定义的动作接口(.action 文件)会编译生成动作需要的 Goal()Feedback()Result() 相关方法。其中的 Result() 方法就是这里谈的结果

from rclpy.action import ActionServer             # ROS2 动作服务器类
from learning_interface.action import MoveCircle  # 自定义的圆周运动接口

class MoveCircleActionServer(Node):
    def __init__(self, name):
        super().__init__(name)  

        # 创建动作服务器(接口类型、动作名、回调函数)                 
        self._action_server = ActionServer(      
            self,
            MoveCircle,
            'move_circle',
            self.execute_callback)

    # 执行收到动作目标之后的处理函数
    def execute_callback(self, goal_handle):
        # 创建一个动作反馈信息的消息            
        feedback_msg = MoveCircle.Feedback()            
        
        # 从0到360度,执行圆周运动,并周期反馈信息
        for i in range(0, 360, 30):                     
            feedback_msg.state = i     # 创建反馈信息,表示当前执行到的角度
            self.get_logger().info('Publishing feedback: %d' % feedback_msg.state)
            goal_handle.publish_feedback(feedback_msg)  # 发布反馈信息
            time.sleep(0.5)
         
        # 动作执行成功
        goal_handle.succeed()      
        
        # 创建结果消息             
        result = MoveCircle.Result()                    
        result.finish = True                            
        return result

         Result() 方法中的 finish 属性从何而来?来自于刚才讲的自定义的动作接口(.action 文件),这里的 finish 也可以是 success、ok等。

        回调函数(execute_callback)的第二个参数 goal_handle 是一个 ROS2 的 ServerGoalHandle 类,查看其源码可以看到 publish_feedback() 和 success() 等函数方法的具体实现。

6.2 反馈(Feedback)

        动作服务器中的这行代码即为反馈信。同样它的 state 属性也是 .action 文件里定义好的。

# 创建一个动作反馈信息的消息            
feedback_msg = MoveCircle.Feedback()  

...

self.get_logger().info('Publishing feedback: %d' % feedback_msg.state)

6.3 目标(Goal)

        先创建一个动作客户端 ActionClient ,并和动作服务器一样,传入一个动作接口。接下来需要定义四个函数,分别是:

  • def send_goal(self, seconds):  等待服务端准备好,构造一个 Goal 消息,最后以异步的方式发送 action 请求(并附带下面一个回调函数)。必须实现
  • def goal_response_callback(self, future): 处理服务端是否接受目标的响应,再附带下一个回调函数。必须实现
  • def get_result_callback(self, future): 处理任务最终的结果返回。必须实现
  • def feedback_callback(self, feedback_msg):  实时处理服务端发来的反馈(比如“还剩几秒”)。非必须实现

         必须实现的函数,函数名可以不同,但功能结构是必须要有的。

        其中  Goal() 函数的属性 enable 是在 .action 文件里预定义的,代表动作开始。

from rclpy.action import ActionClient             # ROS2 动作客户端类
from learning_interface.action import MoveCircle  # 自定义的圆周运动接口

class MoveCircleActionClient(Node):
    def __init__(self, name):
        super().__init__(name)   
        
        # 创建动作客户端(接口类型、动作名)                
        self._action_client = ActionClient(self, MoveCircle, 'move_circle') 

    # 创建一个发送动作目标的函数
    def send_goal(self, enable):                 
        goal_msg = MoveCircle.Goal()             # 创建一个动作目标的消息
        goal_msg.enable = enable                 
        self._action_client.wait_for_server()    # 等待动作的服务器端启动
        
        # 异步方式发送动作的目标
        self._send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)                   
                        
        # 设置一个服务器收到目标之后反馈时的回调函数  
        self._send_goal_future.add_done_callback(self.goal_response_callback) 
    
    # 创建一个服务器收到目标之后反馈时的回调函数
    def goal_response_callback(self, future):          
        goal_handle = future.result()                   # 接收动作的结果
        if not goal_handle.accepted:                    # 如果动作被拒绝执行
            self.get_logger().info('Goal rejected :(')
            return

        self.get_logger().info('Goal accepted :)')                            # 动作被顺利执行
        
        # 异步获取动作最终执行的结果反馈
        self._get_result_future = goal_handle.get_result_async()              
        self._get_result_future.add_done_callback(self.get_result_callback)   # 设置一个收到最终结果的回调函数 

    # 创建一个收到最终结果的回调函数
    def get_result_callback(self, future):                                    
        result = future.result().result                                       
        self.get_logger().info('Result: {%d}' % result.finish)                

    # 创建处理周期反馈消息的回调函数
    def feedback_callback(self, feedback_msg):                                
        feedback = feedback_msg.feedback                                      # 读取反馈的数据
        self.get_logger().info('Received feedback: {%d}' % feedback.state) 

        上述回调函数(xxx_callback)中的第二个参数涉及的 future.result() 即为 ROS2 的 ClientGoalHandle 类。

         虽然 future.result() 为 ROS2 的 ClientGoalHandle 类,但是future.result().result 为自定义的 result消息体,也就是等价于 MoveCircle.result() 。

七、通信接口

        ROS2 中,“接口”有非常具体的含义,包括三种类型:

接口类型 用在哪 文件格式 举例
消息(Message) Topic .msg std_msgs/String.msg
服务(Service) Service .srv example_interfaces/AddTwoInts.srv
动作(Action) Action .action example_interfaces/MoveCircle.action

        上述的三种文件格式是告诉你,我这个 Topic / Service / Action 需要啥格式的数据,你必须按这个格式来跟我通信。而具体的格式如下:

        上述的接口(.msg / .srv / .action)是纯文本格式的“接口描述文件”,你写完之后,系统还不能直接用。至少要通过 colcon build 进行编译生成对应的 Python 或 C++ 文件,这些文件才能在你的节点代码中导入和使用。具体步骤如下:

7.1 写接口文件

my_package/
├── srv/
│   └── AddTwoInts.srv
├── msg/
│   └── MyMsg.msg
├── action/
│   └── Countdown.action

7.2 声明你的接口文件类型(在 CMakeLists.txt

        这一步就是关键,它告诉编译器:“喂,这些接口文件要转成代码!”

find_package(rosidl_default_generators REQUIRED)

rosidl_generate_interfaces(${PROJECT_NAME}
  "msg/MyMsg.msg"
  "srv/AddTwoInts.srv"
  "action/Countdown.action"
)

 7.3 配置 package.xml(声明依赖)

        这一步的意思是:编译 & 运行时都要依赖“接口生成器”。

<buildtool_depend>rosidl_default_generators</buildtool_depend>
<build_depend>rosidl_default_runtime</build_depend>
<exec_depend>rosidl_default_runtime</exec_depend>

7.4 执行 colcon build 

colcon build --packages-select my_package

        这一步,ROS 2 会自动:

  1. 读取你的 .msg / .srv / .action

  2. 调用内部的 rosidl 工具链

  3. 自动生成对应语言(Python 或 C++)的接口类(包括序列化、类型定义、服务调用封装等)

  4. 放到 build/install/log/ 目录中

        当编译结束后,就会在 install/ 目录下生成这几个文件:

install/my_package/lib/python3.10/site-packages/my_package/srv/AddTwoInts.py
install/my_package/lib/python3.10/site-packages/my_package/msg/MyMsg.py
install/my_package/lib/python3.10/site-packages/my_package/action/Countdown.py

        例如,AddTwoInts 就会按照 .srv 文件定义的那样生成三个类(真实的 AddTwoInts.py 会比这个复杂的多)。

class AddTwoInts_Request:
    a: int
    b: int

class AddTwoInts_Response:
    sum: int

class AddTwoInts:
    Request = AddTwoInts_Request
    Response = AddTwoInts_Response

 7.5 使用

from my_package.srv import AddTwoInts

req = AddTwoInts.Request()
req.a = 3
req.b = 5

八、参数

        ROS 2 中的“参数”是非常实用的功能,能让你的节点在运行时更加灵活、可配置,不需要重新改代码就能修改行为。

8.1 创建参数

from rclpy.node import Node

class MyNode(Node):
    def __init__(self):
        super().__init__('my_node')
        
        # 声明参数,带默认值
        self.declare_parameter('speed', 0.5)
        
        # 获取参数值
        self.speed = self.get_parameter('speed').get_parameter_value().double_value

        self.get_logger().info(f"当前速度参数为:{self.speed}")

8.2 设置参数

        有以下几种常见的设置参数方法:

  • 在代码运行时设置具体参数:
ros2 run my_package my_node_exe --ros-args -p speed:=1.2
  • 在 YAML 文件中批量配置:
my_node:
  ros__parameters:
    speed: 1.5
    mode: "auto"
ros2 run my_package my_node_exe --ros-args --params-file config.yaml
  • 动态修改
ros2 param list                      # 查看当前参数列表
ros2 param get /my_node speed        # 查看某个参数
ros2 param set /my_node speed 2.0    # 设置某个参数为具体值

 九、总结

         以上就是从工作区与包、节点概念,到 Topic/Service/Action 通信、接口生成、异步回调,以及运行时参数管理的全流程。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐