系列概述

临近本科毕业,考虑到未来读研的方向以及自己的兴趣方向,我选择的课题大致为“基于VLA结构的指令驱动式机械臂仿真系统的实现”。

为什么说是VLA(Vision-Language-Action)“结构”,因为就目前而言,我认为在目前剩下的几个月时间内从0实现一个正儿八经的VLA模型,所需要的时间、资金、模型资源的获取都是比较麻烦的。因此,我选择使用ROS1+LLM+视觉算法来实现一个“伪VLA”结构,就目前阶段(开题一个月)而言,我能给出的场景示意如下:

场景:指令输入“机械臂将蓝色方块夹起放到红色圆柱体上面”,系统接收指令后,先通过视觉模块确定当前系统中各个物体的坐标,再通过开源大模型,结合已知坐标信息,通过预设的prompt生成动作序列,作为参数送入ROS1架构下的启动文件中,实现动作行为在gazebo下的仿真。

当前阶段我能给出粗糙的逻辑示意图如下:

接下来,我将给出目前阶段我所计划的步骤实现,之后该系列的博客都会依照下面的框架进行更新。由于我也是初次入门ROS以及深度学习相关的内容,所以本系列博客更多的充当学习笔记的作用,在书写过程中难免会出现错误以及天真的理论理解,还请各位指正。

我将该项目的实现分成下面几个步骤(每个步骤下的博客会一步一步地更新):

1. 实现机械臂在ROS1+Gazebo环境下的控制、仿真。目标效果是给出任意坐标的方块,机械臂要能稳定的抓取,并放置到指定的坐标。

该步骤博客目录如下:

https://blog.csdn.net/m0_75114363/article/details/156164226?spm=1001.2014.3001.5501
https://blog.csdn.net/m0_75114363/article/details/156166592?spm=1001.2014.3001.5502
https://blog.csdn.net/m0_75114363/article/details/156426200?spm=1001.2014.3001.5501
https://blog.csdn.net/m0_75114363/article/details/156544524?spm=1001.2014.3001.5502

2. 加入视觉模块与算法。目标效果是在仿真环境下,对于随意放置的方块,视觉系统需要计算出其真实坐标给予机械臂控制模块,使得机械臂能够实现对其的抓取与放置。

该步骤博客目录如下:

https://blog.csdn.net/m0_75114363/article/details/156641634?spm=1001.2014.3001.5502

3. 加入LLM(本地部署或使用API)。目标效果是对于输入的任意文本指令,LLM能根据预设的prompt,结合视觉系统给予的信息,给予执行模块对应的动作序列,使得机械臂正确地实现输入的文本指令想达到的效果,实现V-L-A的完整交互。

该步骤博客目录如下:

4. 实现整体系统的优化与完善,包括基于QT搭建软件前端、优化模型外观、加入更复杂的机械臂、实现更复杂的指令解析与运行。

该步骤博客目录如下:

我所使用的环境如下:

1. 系统:Ubuntu20.04

2. ROS1:Noetic

项目地址:

https://github.com/Dukiyaaa/Cmd2Action_ROS1

此外,额外说明一下为什么这个项目会选择ROS1做框架而不是更现代的ROS2。其实我在初期也是用的ROS2的框架,但发现机械臂夹爪始终无法夹起物体,网上相关的机械臂开源资料基本都是基于ROS1的,加上ROS1提供了grasp_fix插件可防止物体掉落,所以最终我选择了ROS1做项目框架,同时也希望自己在后期能够在ROS2上成功迁移项目。


章节概述

在之前的文章【机械臂】【视觉】二、基于YOLOv8实现物体大致坐标定位-CSDN博客中,我基于YOLOv8实现了相机拍摄物体后的视觉定位。在这一章节中,我将实现Agent模块,其担当的角色为:接受llm模块解析用户指令后的输出,查阅视觉模块的信息,向控制器模块发送控制指令。

目前,几个模块之间的架构图和时序图大致设计如下:


1.视觉模块与Agent模块的通信设计

简单来讲,这两部分的通信需求为:视觉模块通过模型推理得到场景上各个物体的类别和坐标后,需要通过话题发布出去,Agent模块通过订阅该话题,可以实时知晓场景上的视觉信息,用于接受LLM指令后进行视觉查阅、做进一步的指令封装,用于发给控制模块。

1.1.通信msg设计

在src/arm_vision下,新建msg文件夹,加入如下两个msg文件

DetectedObject.msg

# 检测到的物体信息
# 包含物体的3D位姿、类别ID和置信度

geometry_msgs/PoseStamped pose
int32 class_id
float32 confidence

DetectedObjectPool.msg

# 检测到的物体池

# 整批数据的全局时间戳和 frame_id
Header header 
DetectedObject[] objects

简单来说,我先定义了一个单个物体的信息msg,其中包含检测到的物体类别id(0代表blue_box,1代表green_cylinder)、坐标信息、置信度,为的是让Agent拿到相关信息时,可以知道是什么东西,以及这个东西的坐标是多少。

接着,我定义了一个物体池,可以列表性的发送所有检测到的物体信息,可以让Agent一次通信拿到场上所有物体的信息。

最后,在cmakelist和package.xml中加入对应的消息生成代码和依赖:

cmakelist.txt

package.xml

1.2.业务逻辑代码实现

my_vision.py

self.detected_objects_pub = rospy.Publisher('/detected_objects', DetectedObjectPool, queue_size=10)
for box, score, cls_id in zip(xyxy, scores, classes):
            if score < self.conf_thres:
                rospy.loginfo(f"Score {score} less than conf_thres")
                continue
            
            # 取中心点
            u = int((box[0] + box[2]) / 2)
            v = int((box[1] + box[3]) / 2)

            # rospy.loginfo(f"u: {u}, v: {v}")
            if u < 0 or v < 0 or u >= depth.shape[1] or v >= depth.shape[0]:
                continue
            
            # 通过取出来的坐标去深度图里匹配对应的深度值
            # depth_value = depth[v, u]
            depth_value = 1.25
            point = self.pixel_to_world_coordinate(u, v, depth_value)
            rospy.loginfo(f"Pixel to World coords: {point}")
            if point is None:
                continue

            # 创建自定义消息,包含位姿、类别ID和置信度
            detected_obj = DetectedObject()
            detected_obj.pose = PoseStamped()
            if self.depth_header:
                detected_obj.pose.header = self.depth_header
            else:
                detected_obj.pose.header.stamp = rospy.Time.now()
                detected_obj.pose.header.frame_id = 'world'
            detected_obj.pose.pose.position.x, detected_obj.pose.pose.position.y, detected_obj.pose.pose.position.z = point
            detected_obj.pose.pose.orientation.w = 1.0 # 朝向默认不变
            detected_obj.class_id = int(cls_id)
            detected_obj.confidence = float(score)
            # self.detected_objects_pub.publish(detected_obj)
            detections.append((box, score, cls_id)) # 用于可视化,保存完整的检测信息
            detected_objects.append(detected_obj)

        if detected_objects:
            pool_msg = DetectedObjectPool()
            pool_msg.header.stamp = rospy.Time.now()
            pool_msg.header.frame_id = 'world'
            pool_msg.objects = detected_objects
            self.detected_objects_pub.publish(pool_msg)

在视觉模块中,对每一次推理得到的结果,都按照DetectedObjectPool.msg的格式统一封装,在detected_objects话题上发布。

arm_application/src/task_agent.py

class ObjectDetector:
    def __init__(self):
        self.detected_objects = {}  # {class_id (int): (x, y, z)}
        self.sub = rospy.Subscriber('/detected_objects', DetectedObjectPool, self._callback)
        rospy.loginfo('[ObjectDetector] 初始化完成,等待视觉数据...')

    def _callback(self, msg):
        self.detected_objects.clear()
        for obj in msg.objects:
            self.detected_objects[obj.class_id] = (
                obj.pose.pose.position.x,
                obj.pose.pose.position.y,
                obj.pose.pose.position.z
            )

    def get_position(self, class_id):
        return self.detected_objects.get(class_id)

在agent模块中,订阅detected_objects话题,将获取到的物体信息放入缓冲区,需要注意的是,如果场上有多个相同类别的物体,缓冲区只会保留最新的一个的坐标,这个在完成整体demo后,会做改进。

接着,借助于字典结构体的性质,封装get_position,基于类别id做坐标查询。


2.LLM模块与Agent模块的通信设计

简单来讲,这两部分的通信需求为:LLM模块接受用户输入的文字指令后,将其封装为ros msg,格式大致为“动作+类别id”,通过话题发送出去,Agent模块订阅相应的话题,解析对应的命令,查阅视觉信息,再进一步封装给控制模块的指令,通过服务发送请求。

2.1.通信msg设计

在src/arm_application下,新建msg文件夹,加入如下msg文件

LLMCommands.msg

# LLMCommands.msg
string action_type    # "pick", "place", "pick_place"
int32 target_class_id # -1 表示无目标(如 home)
string target_name    # 可选:"red_block"(用于日志/调试)

就目前阶段而言,定义的msg内容较为简单,包含动作类型(目前验证阶段限定为pick_place),目标分类id以及目标的名字(这个属性目前还没用上)。LLM模块根据用户的命令输入进行解析后,输出要做的动作以及动作对应的目标物体,通过话题发布出去。agent模块订阅话题获取此msg后,通过视觉模块查找目标物体,随后将动作类型及目标坐标一起发给执行模块。

2.1.业务逻辑代码实现

task_agent.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Agent 模块:接收 LLM 指令,查询视觉数据,执行 pick-and-place 任务
"""

import rospy
from arm_vision.msg import DetectedObjectPool
from geometry_msgs.msg import PoseStamped
from arm_application.srv import AgentCommands
from arm_application.msg import LLMCommands
import sys
import os

# 添加源代码目录到 Python 路径
script_dir = os.path.dirname(os.path.abspath(__file__))
if script_dir not in sys.path:
    sys.path.insert(0, script_dir)

class ObjectDetector:
    def __init__(self):
        self.detected_objects = {}  # {class_id (int): (x, y, z)}
        self.sub = rospy.Subscriber('/detected_objects', DetectedObjectPool, self._callback)
        rospy.loginfo('[ObjectDetector] 初始化完成,等待视觉数据...')

    def _callback(self, msg):
        self.detected_objects.clear()
        for obj in msg.objects:
            self.detected_objects[obj.class_id] = (
                obj.pose.pose.position.x,
                obj.pose.pose.position.y,
                obj.pose.pose.position.z
            )

    def get_position(self, class_id):
        return self.detected_objects.get(class_id)

class TaskAgent:
    def __init__(self):
        # 初始化视觉检测器(单例)
        self.detector = ObjectDetector()
        
        # 等待控制器服务 用于agent向控制器发命令
        rospy.loginfo("等待 /execute_task 服务...")
        rospy.wait_for_service('/execute_task')
        self.task_client = rospy.ServiceProxy('/execute_task', AgentCommands)
        
        # 订阅 LLM 指令 用于llm向agent发解析后的需求
        self.sub = rospy.Subscriber('/llm_commands', LLMCommands, self._llm_callback)
        rospy.loginfo("Agent 已启动,等待 LLM 指令...")

    def _llm_callback(self, msg):
        rospy.loginfo("收到 LLM 指令: action='%s', class_id=%d" % (msg.action_type, msg.target_class_id))
        
        # 仅处理 pick_place 类型(可根据需要扩展)
        if msg.action_type != "pick_place":
            rospy.logwarn("不支持的动作类型: %s" % msg.action_type)
            return

        # 查询目标物体位置
        pos = self.detector.get_position(msg.target_class_id)
        if pos is None:
            rospy.logerr("未检测到 class_id=%d 的物体!" % msg.target_class_id)
            return

        x, y, z = pos
        rospy.loginfo("目标位置: (%.3f, %.3f, %.3f)" % (x, y, z))

        # 构造 pick 和 place 位姿
        pick_pose = self._create_pose(x, y, z)
        
        # 安全放置位置
        place_pose = self._create_pose(0.0, -1.8, 0.05)

        # 发送任务请求
        try:
            resp = self.task_client(
                target_pose=[pick_pose, place_pose],
                action_type="pick_place"
            )
            if resp.success:
                rospy.loginfo("Pick-and-place 任务成功!")
            else:
                rospy.logerr("任务失败: %s" % resp.message)
        except rospy.ServiceException as e:
            rospy.logerr("服务调用异常: %s" % str(e))

    def _create_pose(self, x, y, z, frame_id="world"):
        pose = PoseStamped()
        pose.header.frame_id = frame_id
        pose.pose.position.x = x
        pose.pose.position.y = y
        pose.pose.position.z = z
        pose.pose.orientation.w = 1.0
        return pose


def main():
    rospy.init_node('task_agent')
    agent = TaskAgent()
    rospy.spin()  # 保持节点运行,监听 LLM 指令


if __name__ == '__main__':
    main()

当前阶段的代码内容较为简单,仅解析pick_place一个命令(因为执行模块目前只实现了这一个函数),通过视觉获取目标坐标,并将放置位置固定为(0.0, -1.8, 0.05)。这样设计的作用是为了初期验证。


3.Agent模块与控制模块的通信设计

与之前的通信设计不同的是,Agent与控制模块采用的是服务,即Agen的控制请求会得到控制模块的反馈。后面会考虑把LLM与Agent之间的通信也设计为服务,这样可以打通LLM、Agent、控制模块之前的反馈通路。

3.1.通信srv设计

在src/arm_application下,新建srv文件夹,加入如下srv文件

AgentCommands.srv

geometry_msgs/PoseStamped[] target_pose
string action_type   # e.g. "pick", "place"
---
bool success
string message

Agent只向控制模块发送动作类型和目标坐标。

注意,这里的目标坐标为一个列表,第0项表示起始坐标,第1项表示终点坐标,在pick_place任务中,pick坐标便为起始坐标,place坐标便为终点坐标。

3.1.业务逻辑代码实现

scara_controller.py

# src/scara_controller.py
#!/usr/bin/env python3

import sys
import os

# 添加源代码目录到 Python 路径
script_dir = os.path.dirname(os.path.abspath(__file__))
if script_dir not in sys.path:
    sys.path.insert(0, script_dir)

import rospy
from arm_application.srv import AgentCommands, AgentCommandsResponse
from geometry_msgs.msg import PoseStamped
from my_scara_action import ArmController

arm_controller = None
def handle_task(req):
    # rospy.loginfo(f"收到任务: action='{req.action_type}', pos=({req.target_pose.pose.position.x:.2f}, {req.target_pose.pose.position.y:.2f}, {req.target_pose.pose.position.z:.2f})")
    action = req.action_type
    
    if action == "pick":
        target_x = req.target_pose[0].pose.position.x
        target_y = req.target_pose[0].pose.position.y
        target_z = req.target_pose[0].pose.position.z
        arm_controller.pick_and_place([target_x, target_y, target_z], [0.0, -1.8, 0.05 + 0.032])
        arm_controller.arm_reset()
    elif action == "place":
        target_x = req.target_pose[0].pose.position.x
        target_y = req.target_pose[0].pose.position.y
        target_z = req.target_pose[0].pose.position.z
        arm_controller.pick_and_place([target_x, target_y, target_z], [0.0, -1.8, 0.05 + 0.032])
        arm_controller.arm_reset()
    elif action == "pick_place":
        pick_x = req.target_pose[0].pose.position.x
        pick_y = req.target_pose[0].pose.position.y
        pick_z = req.target_pose[0].pose.position.z
        place_x = req.target_pose[1].pose.position.x
        place_y = req.target_pose[1].pose.position.y
        place_z = req.target_pose[1].pose.position.z
        arm_controller.pick_and_place([pick_x, pick_y, pick_z], [place_x, place_y, place_z + 0.032])
        arm_controller.arm_reset()
    return AgentCommandsResponse(success=True, message="Mock success")

if __name__ == '__main__':
    rospy.init_node('scara_controller')
    arm_controller = ArmController()
    s = rospy.Service('/execute_task', AgentCommands, handle_task)
    rospy.loginfo("控制器已启动,等待任务...")
    rospy.spin()

控制模块通过字符匹配动作类型,通过发布下来的坐标做对应的动作,最后反馈。


4.下一步的优化

可以发现,对于目前的系统而言,整体设计是比较粗糙的,目前只做了pick_palce命令的解析,通过命令行输入:

rostopic pub /llm_commands arm_application/LLMCommands "action_type: 'pick_place' target_class_id: 0"

各个模块做了对应的行动,在gazebo世界中成功将方块抓起并放置,实现了整体链路的验证。

下一步,我将做以下几点优化:

1.加入更多的指令,比如单独的pick、place

2.支持LLM也加入坐标参数,即LLM输出的指令不再只包含动作和目标,也可以加入坐标,这样便可以实现"把蓝色方块放到(0.1,-1.6,0.05)去"这样的命令解析。

3.将LLM与Agent之间的通信也改为服务通信,加入反馈。

4.基于第2点,考虑将当前的agent与llm合并,形成一个传统意义上的大模型agent,虽然这样可能会增多api调用的次数。


总结

实现了LLM、Agent、控制器之间的基本通信架构demo,完成了验证,下一步是在当前基础上做优化,丰富系统功能及健壮性。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐