Origincar智能小车巡线代码
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)# 修复:空格错误。self.roi_height_ratio = self.get_parameter("roi_height_ratio").value# 修复:变量名拼写错误。cv2.circle(roi, (cx, cy), 5,
import rclpy
from rclpy.node import Node
# 保留 CompressedImage(订阅用),Image 可选(如果不需要普通 Image 可删除)
from sensor_msgs.msg import CompressedImage, Image
from geometry_msgs.msg import Twist
from cv_bridge import CvBridge
import cv2
import numpy as np
class LineFollowerNode(Node):
def __init__(self):
super().__init__("line_follower_node")
# 1. 初始化参数(可通过 ROS2 参数服务器修改)
self.declare_parameter("image_topic", "/image") # 压缩图像话题(通常是 /image/compressed,根据实际修改)
self.declare_parameter("cmd_vel_topic", "/cmd_vel") # 速度控制话题
self.declare_parameter("hsv_low", [0, 0, 0]) # 黑色轨迹 HSV 下限
self.declare_parameter("hsv_high", [100, 150, 50]) # 黑色轨迹 HSV 上限
self.declare_parameter("forward_speed", 0.2) # 前进速度(m/s)
self.declare_parameter("turn_gain", 0.002) # 转向增益(越大转向越灵敏)
self.declare_parameter("roi_height_ratio", 0.3) # 感兴趣区域(ROI)高度比例(只关注图像下方 30%)
# 获取参数
self.image_topic = self.get_parameter("image_topic").value
self.cmd_vel_topic = self.get_parameter("cmd_vel_topic").value
self.hsv_low = self.get_parameter("hsv_low").value
self.hsv_high = self.get_parameter("hsv_high").value
self.forward_speed = self.get_parameter("forward_speed").value
self.turn_gain = self.get_parameter("turn_gain").value
self.roi_height_ratio = self.get_parameter("roi_height_ratio").value # 修复:变量名拼写错误
# 2. 初始化 ROS 组件
self.bridge = CvBridge() # 图像格式转换(ROS CompressedImage ↔ OpenCV Mat)
self.cmd_vel_pub = self.create_publisher(Twist, self.cmd_vel_topic, 10)
# 修复:订阅类型改为 CompressedImage
self.image_sub = self.create_subscription(
CompressedImage, self.image_topic, self.image_callback, 10
)
# 3. 初始化变量
self.image_width = 640 # 摄像头图像宽度(根据实际设备修改)
self.image_height = 480 # 摄像头图像高度(根据实际设备修改)
self.midpoint = self.image_width // 2 # 修复:缺少整除符号
self.get_logger().info("Line follower node started! (CompressedImage version)")
self.get_logger().info(f"Subscribed to compressed image topic: {self.image_topic}")
def image_callback(self, msg):
"""图像回调函数:处理压缩图像并计算控制指令"""
try:
# 1. 关键修复:压缩图像转换为 OpenCV 格式(使用 compressed_imgmsg_to_cv2)
# desired_encoding 可选:"bgr8"(默认)、"rgb8" 等
cv_image = self.bridge.compressed_imgmsg_to_cv2(msg, desired_encoding="bgr8")
self.image_height, self.image_width = cv_image.shape[:2]
self.midpoint = self.image_width // 2 # 修复:缺少整除符号
# 2. 预处理:裁剪感兴趣区域(ROI,只关注图像下方部分,减少干扰)
roi_y = int(self.image_height * (1 - self.roi_height_ratio)) # 使用正确的变量名
roi = cv_image[roi_y:, :] # 裁剪 [y_start:y_end, x_start:x_end]
# 3. 颜色空间转换:BGR → HSV(HSV 颜色空间对光照变化更鲁棒)
hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV) # 修复:空格错误
# 4. 阈值分割:提取黑色轨迹(生成轨迹为白色、背景为黑色的二值图)
hsv_low = np.array(self.hsv_low, dtype=np.uint8)
hsv_high = np.array(self.hsv_high, dtype=np.uint8)
mask = cv2.inRange(hsv, hsv_low, hsv_high)
# 5. 形态学操作:去除噪声(膨胀+腐蚀)
kernel = np.ones((5, 5), np.uint8)
mask = cv2.dilate(mask, kernel, iterations=1) # 膨胀:连接断裂的轨迹
mask = cv2.erode(mask, kernel, iterations=1) # 腐蚀:去除细小噪声
# 6. 轮廓检测:找到轨迹的轮廓
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) # 修复:空格错误
if contours:
# 7. 选择最大轮廓(假设最大轮廓是轨迹)
max_contour = max(contours, key=cv2.contourArea)
# 8. 计算轮廓重心(轨迹中心)
M = cv2.moments(max_contour)
if M["m00"] != 0: # 避免除零
cx = int(M["m10"] / M["m00"]) # 重心 x 坐标(相对于 ROI)
cy = int(M["m01"] / M["m00"]) # 重心 y 坐标(相对于 ROI)
# 9. 计算偏差:重心与图像中点的差值(正数→轨迹偏右,负数→轨迹偏左)
error = cx - self.midpoint
# 10. 生成控制指令(比例控制)
twist = Twist()
twist.linear.x = self.forward_speed # 固定前进速度
twist.angular.z = -self.turn_gain * error # 转向控制(负号修正方向)
# 限制角速度范围(避免过度转向)
twist.angular.z = max(min(twist.angular.z, 0.5), -0.5)
# 发布速度指令
self.cmd_vel_pub.publish(twist)
# 可视化:在 ROI 上绘制重心和轨迹轮廓
cv2.circle(roi, (cx, cy), 5, (0, 255, 0), -1) # 重心(绿色)
cv2.drawContours(roi, [max_contour], -1, (255, 0, 0), 2) # 轮廓(蓝色)
# 调试用:显示处理后的图像(按 Ctrl+C 关闭窗口)
cv2.imshow("ROI + Contour (Compressed)", roi)
cv2.imshow("Mask (Compressed)", mask)
cv2.waitKey(1) # 必须加,否则图像窗口会卡死
else:
# 未检测到轨迹时:停止前进,缓慢旋转寻找轨迹
self.get_logger().warn("No line detected! Rotating to find line...")
twist = Twist()
twist.linear.x = 0.0
twist.angular.z = 0.3 # 顺时针旋转(可改为 -0.3 逆时针)
self.cmd_vel_pub.publish(twist)
# 关闭图像窗口(避免无图像时窗口残留)
cv2.destroyAllWindows()
except Exception as e:
self.get_logger().error(f"Error processing compressed image: {str(e)}")
cv2.destroyAllWindows()
def destroy_node(self):
"""节点销毁时:停止小车并关闭图像窗口"""
twist = Twist() # 修复:创建正确的 Twist 实例
self.cmd_vel_pub.publish(twist)
cv2.destroyAllWindows()
super().destroy_node()
def main(args=None):
rclpy.init(args=args)
node = LineFollowerNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
node.get_logger().info("Node stopped by user (Ctrl+C)")
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()
#开源新春集福 #地瓜机器人 #OpenLoong开源社区 #AtomGit

更多推荐



所有评论(0)