【机械臂】【Agent】一、Agent与其它模块的通信设计
在之前的文章【机械臂】【视觉】二、基于YOLOv8实现物体大致坐标定位-CSDN博客中,我基于YOLOv8实现了相机拍摄物体后的视觉定位。在这一章节中,我将实现Agent模块,其担当的角色为:接受llm模块解析用户指令后的输出,查阅视觉模块的信息,向控制器模块发送控制指令。实现了LLM、Agent、控制器之间的基本通信架构demo,完成了验证,下一步是在当前基础上做优化,丰富系统功能及健壮性。
系列概述
临近本科毕业,考虑到未来读研的方向以及自己的兴趣方向,我选择的课题大致为“基于VLA结构的指令驱动式机械臂仿真系统的实现”。
为什么说是VLA(Vision-Language-Action)“结构”,因为就目前而言,我认为在目前剩下的几个月时间内从0实现一个正儿八经的VLA模型,所需要的时间、资金、模型资源的获取都是比较麻烦的。因此,我选择使用ROS1+LLM+视觉算法来实现一个“伪VLA”结构,就目前阶段(开题一个月)而言,我能给出的场景示意如下:
场景:指令输入“机械臂将蓝色方块夹起放到红色圆柱体上面”,系统接收指令后,先通过视觉模块确定当前系统中各个物体的坐标,再通过开源大模型,结合已知坐标信息,通过预设的prompt生成动作序列,作为参数送入ROS1架构下的启动文件中,实现动作行为在gazebo下的仿真。
当前阶段我能给出粗糙的逻辑示意图如下:

接下来,我将给出目前阶段我所计划的步骤实现,之后该系列的博客都会依照下面的框架进行更新。由于我也是初次入门ROS以及深度学习相关的内容,所以本系列博客更多的充当学习笔记的作用,在书写过程中难免会出现错误以及天真的理论理解,还请各位指正。
我将该项目的实现分成下面几个步骤(每个步骤下的博客会一步一步地更新):
1. 实现机械臂在ROS1+Gazebo环境下的控制、仿真。目标效果是给出任意坐标的方块,机械臂要能稳定的抓取,并放置到指定的坐标。
该步骤博客目录如下:
https://blog.csdn.net/m0_75114363/article/details/156164226?spm=1001.2014.3001.5501
https://blog.csdn.net/m0_75114363/article/details/156166592?spm=1001.2014.3001.5502
https://blog.csdn.net/m0_75114363/article/details/156426200?spm=1001.2014.3001.5501
https://blog.csdn.net/m0_75114363/article/details/156544524?spm=1001.2014.3001.5502
2. 加入视觉模块与算法。目标效果是在仿真环境下,对于随意放置的方块,视觉系统需要计算出其真实坐标给予机械臂控制模块,使得机械臂能够实现对其的抓取与放置。
该步骤博客目录如下:
https://blog.csdn.net/m0_75114363/article/details/156641634?spm=1001.2014.3001.5502
3. 加入LLM(本地部署或使用API)。目标效果是对于输入的任意文本指令,LLM能根据预设的prompt,结合视觉系统给予的信息,给予执行模块对应的动作序列,使得机械臂正确地实现输入的文本指令想达到的效果,实现V-L-A的完整交互。
该步骤博客目录如下:
4. 实现整体系统的优化与完善,包括基于QT搭建软件前端、优化模型外观、加入更复杂的机械臂、实现更复杂的指令解析与运行。
该步骤博客目录如下:
我所使用的环境如下:
1. 系统:Ubuntu20.04
2. ROS1:Noetic
项目地址:
https://github.com/Dukiyaaa/Cmd2Action_ROS1
此外,额外说明一下为什么这个项目会选择ROS1做框架而不是更现代的ROS2。其实我在初期也是用的ROS2的框架,但发现机械臂夹爪始终无法夹起物体,网上相关的机械臂开源资料基本都是基于ROS1的,加上ROS1提供了grasp_fix插件可防止物体掉落,所以最终我选择了ROS1做项目框架,同时也希望自己在后期能够在ROS2上成功迁移项目。
章节概述
在之前的文章【机械臂】【视觉】二、基于YOLOv8实现物体大致坐标定位-CSDN博客中,我基于YOLOv8实现了相机拍摄物体后的视觉定位。在这一章节中,我将实现Agent模块,其担当的角色为:接受llm模块解析用户指令后的输出,查阅视觉模块的信息,向控制器模块发送控制指令。
目前,几个模块之间的架构图和时序图大致设计如下:


1.视觉模块与Agent模块的通信设计
简单来讲,这两部分的通信需求为:视觉模块通过模型推理得到场景上各个物体的类别和坐标后,需要通过话题发布出去,Agent模块通过订阅该话题,可以实时知晓场景上的视觉信息,用于接受LLM指令后进行视觉查阅、做进一步的指令封装,用于发给控制模块。
1.1.通信msg设计
在src/arm_vision下,新建msg文件夹,加入如下两个msg文件

DetectedObject.msg
# 检测到的物体信息
# 包含物体的3D位姿、类别ID和置信度
geometry_msgs/PoseStamped pose
int32 class_id
float32 confidence
DetectedObjectPool.msg
# 检测到的物体池
# 整批数据的全局时间戳和 frame_id
Header header
DetectedObject[] objects
简单来说,我先定义了一个单个物体的信息msg,其中包含检测到的物体类别id(0代表blue_box,1代表green_cylinder)、坐标信息、置信度,为的是让Agent拿到相关信息时,可以知道是什么东西,以及这个东西的坐标是多少。
接着,我定义了一个物体池,可以列表性的发送所有检测到的物体信息,可以让Agent一次通信拿到场上所有物体的信息。
最后,在cmakelist和package.xml中加入对应的消息生成代码和依赖:
cmakelist.txt

package.xml

1.2.业务逻辑代码实现
my_vision.py
self.detected_objects_pub = rospy.Publisher('/detected_objects', DetectedObjectPool, queue_size=10)
for box, score, cls_id in zip(xyxy, scores, classes):
if score < self.conf_thres:
rospy.loginfo(f"Score {score} less than conf_thres")
continue
# 取中心点
u = int((box[0] + box[2]) / 2)
v = int((box[1] + box[3]) / 2)
# rospy.loginfo(f"u: {u}, v: {v}")
if u < 0 or v < 0 or u >= depth.shape[1] or v >= depth.shape[0]:
continue
# 通过取出来的坐标去深度图里匹配对应的深度值
# depth_value = depth[v, u]
depth_value = 1.25
point = self.pixel_to_world_coordinate(u, v, depth_value)
rospy.loginfo(f"Pixel to World coords: {point}")
if point is None:
continue
# 创建自定义消息,包含位姿、类别ID和置信度
detected_obj = DetectedObject()
detected_obj.pose = PoseStamped()
if self.depth_header:
detected_obj.pose.header = self.depth_header
else:
detected_obj.pose.header.stamp = rospy.Time.now()
detected_obj.pose.header.frame_id = 'world'
detected_obj.pose.pose.position.x, detected_obj.pose.pose.position.y, detected_obj.pose.pose.position.z = point
detected_obj.pose.pose.orientation.w = 1.0 # 朝向默认不变
detected_obj.class_id = int(cls_id)
detected_obj.confidence = float(score)
# self.detected_objects_pub.publish(detected_obj)
detections.append((box, score, cls_id)) # 用于可视化,保存完整的检测信息
detected_objects.append(detected_obj)
if detected_objects:
pool_msg = DetectedObjectPool()
pool_msg.header.stamp = rospy.Time.now()
pool_msg.header.frame_id = 'world'
pool_msg.objects = detected_objects
self.detected_objects_pub.publish(pool_msg)
在视觉模块中,对每一次推理得到的结果,都按照DetectedObjectPool.msg的格式统一封装,在detected_objects话题上发布。
arm_application/src/task_agent.py
class ObjectDetector:
def __init__(self):
self.detected_objects = {} # {class_id (int): (x, y, z)}
self.sub = rospy.Subscriber('/detected_objects', DetectedObjectPool, self._callback)
rospy.loginfo('[ObjectDetector] 初始化完成,等待视觉数据...')
def _callback(self, msg):
self.detected_objects.clear()
for obj in msg.objects:
self.detected_objects[obj.class_id] = (
obj.pose.pose.position.x,
obj.pose.pose.position.y,
obj.pose.pose.position.z
)
def get_position(self, class_id):
return self.detected_objects.get(class_id)
在agent模块中,订阅detected_objects话题,将获取到的物体信息放入缓冲区,需要注意的是,如果场上有多个相同类别的物体,缓冲区只会保留最新的一个的坐标,这个在完成整体demo后,会做改进。
接着,借助于字典结构体的性质,封装get_position,基于类别id做坐标查询。
2.LLM模块与Agent模块的通信设计
简单来讲,这两部分的通信需求为:LLM模块接受用户输入的文字指令后,将其封装为ros msg,格式大致为“动作+类别id”,通过话题发送出去,Agent模块订阅相应的话题,解析对应的命令,查阅视觉信息,再进一步封装给控制模块的指令,通过服务发送请求。
2.1.通信msg设计
在src/arm_application下,新建msg文件夹,加入如下msg文件

LLMCommands.msg
# LLMCommands.msg
string action_type # "pick", "place", "pick_place"
int32 target_class_id # -1 表示无目标(如 home)
string target_name # 可选:"red_block"(用于日志/调试)
就目前阶段而言,定义的msg内容较为简单,包含动作类型(目前验证阶段限定为pick_place),目标分类id以及目标的名字(这个属性目前还没用上)。LLM模块根据用户的命令输入进行解析后,输出要做的动作以及动作对应的目标物体,通过话题发布出去。agent模块订阅话题获取此msg后,通过视觉模块查找目标物体,随后将动作类型及目标坐标一起发给执行模块。
2.1.业务逻辑代码实现
task_agent.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Agent 模块:接收 LLM 指令,查询视觉数据,执行 pick-and-place 任务
"""
import rospy
from arm_vision.msg import DetectedObjectPool
from geometry_msgs.msg import PoseStamped
from arm_application.srv import AgentCommands
from arm_application.msg import LLMCommands
import sys
import os
# 添加源代码目录到 Python 路径
script_dir = os.path.dirname(os.path.abspath(__file__))
if script_dir not in sys.path:
sys.path.insert(0, script_dir)
class ObjectDetector:
def __init__(self):
self.detected_objects = {} # {class_id (int): (x, y, z)}
self.sub = rospy.Subscriber('/detected_objects', DetectedObjectPool, self._callback)
rospy.loginfo('[ObjectDetector] 初始化完成,等待视觉数据...')
def _callback(self, msg):
self.detected_objects.clear()
for obj in msg.objects:
self.detected_objects[obj.class_id] = (
obj.pose.pose.position.x,
obj.pose.pose.position.y,
obj.pose.pose.position.z
)
def get_position(self, class_id):
return self.detected_objects.get(class_id)
class TaskAgent:
def __init__(self):
# 初始化视觉检测器(单例)
self.detector = ObjectDetector()
# 等待控制器服务 用于agent向控制器发命令
rospy.loginfo("等待 /execute_task 服务...")
rospy.wait_for_service('/execute_task')
self.task_client = rospy.ServiceProxy('/execute_task', AgentCommands)
# 订阅 LLM 指令 用于llm向agent发解析后的需求
self.sub = rospy.Subscriber('/llm_commands', LLMCommands, self._llm_callback)
rospy.loginfo("Agent 已启动,等待 LLM 指令...")
def _llm_callback(self, msg):
rospy.loginfo("收到 LLM 指令: action='%s', class_id=%d" % (msg.action_type, msg.target_class_id))
# 仅处理 pick_place 类型(可根据需要扩展)
if msg.action_type != "pick_place":
rospy.logwarn("不支持的动作类型: %s" % msg.action_type)
return
# 查询目标物体位置
pos = self.detector.get_position(msg.target_class_id)
if pos is None:
rospy.logerr("未检测到 class_id=%d 的物体!" % msg.target_class_id)
return
x, y, z = pos
rospy.loginfo("目标位置: (%.3f, %.3f, %.3f)" % (x, y, z))
# 构造 pick 和 place 位姿
pick_pose = self._create_pose(x, y, z)
# 安全放置位置
place_pose = self._create_pose(0.0, -1.8, 0.05)
# 发送任务请求
try:
resp = self.task_client(
target_pose=[pick_pose, place_pose],
action_type="pick_place"
)
if resp.success:
rospy.loginfo("Pick-and-place 任务成功!")
else:
rospy.logerr("任务失败: %s" % resp.message)
except rospy.ServiceException as e:
rospy.logerr("服务调用异常: %s" % str(e))
def _create_pose(self, x, y, z, frame_id="world"):
pose = PoseStamped()
pose.header.frame_id = frame_id
pose.pose.position.x = x
pose.pose.position.y = y
pose.pose.position.z = z
pose.pose.orientation.w = 1.0
return pose
def main():
rospy.init_node('task_agent')
agent = TaskAgent()
rospy.spin() # 保持节点运行,监听 LLM 指令
if __name__ == '__main__':
main()
当前阶段的代码内容较为简单,仅解析pick_place一个命令(因为执行模块目前只实现了这一个函数),通过视觉获取目标坐标,并将放置位置固定为(0.0, -1.8, 0.05)。这样设计的作用是为了初期验证。
3.Agent模块与控制模块的通信设计
与之前的通信设计不同的是,Agent与控制模块采用的是服务,即Agen的控制请求会得到控制模块的反馈。后面会考虑把LLM与Agent之间的通信也设计为服务,这样可以打通LLM、Agent、控制模块之前的反馈通路。
3.1.通信srv设计
在src/arm_application下,新建srv文件夹,加入如下srv文件

AgentCommands.srv
geometry_msgs/PoseStamped[] target_pose
string action_type # e.g. "pick", "place"
---
bool success
string message
Agent只向控制模块发送动作类型和目标坐标。
注意,这里的目标坐标为一个列表,第0项表示起始坐标,第1项表示终点坐标,在pick_place任务中,pick坐标便为起始坐标,place坐标便为终点坐标。
3.1.业务逻辑代码实现
scara_controller.py
# src/scara_controller.py
#!/usr/bin/env python3
import sys
import os
# 添加源代码目录到 Python 路径
script_dir = os.path.dirname(os.path.abspath(__file__))
if script_dir not in sys.path:
sys.path.insert(0, script_dir)
import rospy
from arm_application.srv import AgentCommands, AgentCommandsResponse
from geometry_msgs.msg import PoseStamped
from my_scara_action import ArmController
arm_controller = None
def handle_task(req):
# rospy.loginfo(f"收到任务: action='{req.action_type}', pos=({req.target_pose.pose.position.x:.2f}, {req.target_pose.pose.position.y:.2f}, {req.target_pose.pose.position.z:.2f})")
action = req.action_type
if action == "pick":
target_x = req.target_pose[0].pose.position.x
target_y = req.target_pose[0].pose.position.y
target_z = req.target_pose[0].pose.position.z
arm_controller.pick_and_place([target_x, target_y, target_z], [0.0, -1.8, 0.05 + 0.032])
arm_controller.arm_reset()
elif action == "place":
target_x = req.target_pose[0].pose.position.x
target_y = req.target_pose[0].pose.position.y
target_z = req.target_pose[0].pose.position.z
arm_controller.pick_and_place([target_x, target_y, target_z], [0.0, -1.8, 0.05 + 0.032])
arm_controller.arm_reset()
elif action == "pick_place":
pick_x = req.target_pose[0].pose.position.x
pick_y = req.target_pose[0].pose.position.y
pick_z = req.target_pose[0].pose.position.z
place_x = req.target_pose[1].pose.position.x
place_y = req.target_pose[1].pose.position.y
place_z = req.target_pose[1].pose.position.z
arm_controller.pick_and_place([pick_x, pick_y, pick_z], [place_x, place_y, place_z + 0.032])
arm_controller.arm_reset()
return AgentCommandsResponse(success=True, message="Mock success")
if __name__ == '__main__':
rospy.init_node('scara_controller')
arm_controller = ArmController()
s = rospy.Service('/execute_task', AgentCommands, handle_task)
rospy.loginfo("控制器已启动,等待任务...")
rospy.spin()
控制模块通过字符匹配动作类型,通过发布下来的坐标做对应的动作,最后反馈。
4.下一步的优化
可以发现,对于目前的系统而言,整体设计是比较粗糙的,目前只做了pick_palce命令的解析,通过命令行输入:
rostopic pub /llm_commands arm_application/LLMCommands "action_type: 'pick_place' target_class_id: 0"
各个模块做了对应的行动,在gazebo世界中成功将方块抓起并放置,实现了整体链路的验证。
下一步,我将做以下几点优化:
1.加入更多的指令,比如单独的pick、place
2.支持LLM也加入坐标参数,即LLM输出的指令不再只包含动作和目标,也可以加入坐标,这样便可以实现"把蓝色方块放到(0.1,-1.6,0.05)去"这样的命令解析。
3.将LLM与Agent之间的通信也改为服务通信,加入反馈。
4.基于第2点,考虑将当前的agent与llm合并,形成一个传统意义上的大模型agent,虽然这样可能会增多api调用的次数。
总结
实现了LLM、Agent、控制器之间的基本通信架构demo,完成了验证,下一步是在当前基础上做优化,丰富系统功能及健壮性。
更多推荐


所有评论(0)