import rclpy
from rclpy.node import Node
# 保留 CompressedImage(订阅用),Image 可选(如果不需要普通 Image 可删除)
from sensor_msgs.msg import CompressedImage, Image
from geometry_msgs.msg import Twist
from cv_bridge import CvBridge
import cv2
import numpy as np

class LineFollowerNode(Node):
    def __init__(self):
        super().__init__("line_follower_node")

        # 1. 初始化参数(可通过 ROS2 参数服务器修改)
        self.declare_parameter("image_topic", "/image")  # 压缩图像话题(通常是 /image/compressed,根据实际修改)
        self.declare_parameter("cmd_vel_topic", "/cmd_vel")  # 速度控制话题
        self.declare_parameter("hsv_low", [0, 0, 0])         # 黑色轨迹 HSV 下限
        self.declare_parameter("hsv_high", [100, 150, 50])   # 黑色轨迹 HSV 上限
        self.declare_parameter("forward_speed", 0.2)         # 前进速度(m/s)
        self.declare_parameter("turn_gain", 0.002)           # 转向增益(越大转向越灵敏)
        self.declare_parameter("roi_height_ratio", 0.3)      # 感兴趣区域(ROI)高度比例(只关注图像下方 30%)

        # 获取参数
        self.image_topic = self.get_parameter("image_topic").value
        self.cmd_vel_topic = self.get_parameter("cmd_vel_topic").value
        self.hsv_low = self.get_parameter("hsv_low").value
        self.hsv_high = self.get_parameter("hsv_high").value
        self.forward_speed = self.get_parameter("forward_speed").value
        self.turn_gain = self.get_parameter("turn_gain").value
        self.roi_height_ratio = self.get_parameter("roi_height_ratio").value  # 修复:变量名拼写错误

        # 2. 初始化 ROS 组件
        self.bridge = CvBridge()  # 图像格式转换(ROS CompressedImage ↔ OpenCV Mat)
        self.cmd_vel_pub = self.create_publisher(Twist, self.cmd_vel_topic, 10)
        # 修复:订阅类型改为 CompressedImage
        self.image_sub = self.create_subscription(
            CompressedImage, self.image_topic, self.image_callback, 10
        )

        # 3. 初始化变量
        self.image_width = 640  # 摄像头图像宽度(根据实际设备修改)
        self.image_height = 480 # 摄像头图像高度(根据实际设备修改)
        self.midpoint = self.image_width // 2  # 修复:缺少整除符号

        self.get_logger().info("Line follower node started! (CompressedImage version)")
        self.get_logger().info(f"Subscribed to compressed image topic: {self.image_topic}")

    def image_callback(self, msg):
        """图像回调函数:处理压缩图像并计算控制指令"""
        try:
            # 1. 关键修复:压缩图像转换为 OpenCV 格式(使用 compressed_imgmsg_to_cv2)
            # desired_encoding 可选:"bgr8"(默认)、"rgb8" 等
            cv_image = self.bridge.compressed_imgmsg_to_cv2(msg, desired_encoding="bgr8")
            self.image_height, self.image_width = cv_image.shape[:2]
            self.midpoint = self.image_width // 2  # 修复:缺少整除符号

            # 2. 预处理:裁剪感兴趣区域(ROI,只关注图像下方部分,减少干扰)
            roi_y = int(self.image_height * (1 - self.roi_height_ratio))  # 使用正确的变量名
            roi = cv_image[roi_y:, :]  # 裁剪 [y_start:y_end, x_start:x_end]

            # 3. 颜色空间转换:BGR → HSV(HSV 颜色空间对光照变化更鲁棒)
            hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV)  # 修复:空格错误

            # 4. 阈值分割:提取黑色轨迹(生成轨迹为白色、背景为黑色的二值图)
            hsv_low = np.array(self.hsv_low, dtype=np.uint8)
            hsv_high = np.array(self.hsv_high, dtype=np.uint8)
            mask = cv2.inRange(hsv, hsv_low, hsv_high)

            # 5. 形态学操作:去除噪声(膨胀+腐蚀)
            kernel = np.ones((5, 5), np.uint8)
            mask = cv2.dilate(mask, kernel, iterations=1)  # 膨胀:连接断裂的轨迹
            mask = cv2.erode(mask, kernel, iterations=1)   # 腐蚀:去除细小噪声

            # 6. 轮廓检测:找到轨迹的轮廓
            contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)  # 修复:空格错误

            if contours:
                # 7. 选择最大轮廓(假设最大轮廓是轨迹)
                max_contour = max(contours, key=cv2.contourArea)

                # 8. 计算轮廓重心(轨迹中心)
                M = cv2.moments(max_contour)
                if M["m00"] != 0:  # 避免除零
                    cx = int(M["m10"] / M["m00"])  # 重心 x 坐标(相对于 ROI)
                    cy = int(M["m01"] / M["m00"])  # 重心 y 坐标(相对于 ROI)

                    # 9. 计算偏差:重心与图像中点的差值(正数→轨迹偏右,负数→轨迹偏左)
                    error = cx - self.midpoint

                    # 10. 生成控制指令(比例控制)
                    twist = Twist()
                    twist.linear.x = self.forward_speed  # 固定前进速度
                    twist.angular.z = -self.turn_gain * error  # 转向控制(负号修正方向)

                    # 限制角速度范围(避免过度转向)
                    twist.angular.z = max(min(twist.angular.z, 0.5), -0.5)

                    # 发布速度指令
                    self.cmd_vel_pub.publish(twist)

                    # 可视化:在 ROI 上绘制重心和轨迹轮廓
                    cv2.circle(roi, (cx, cy), 5, (0, 255, 0), -1)  # 重心(绿色)
                    cv2.drawContours(roi, [max_contour], -1, (255, 0, 0), 2)  # 轮廓(蓝色)

                # 调试用:显示处理后的图像(按 Ctrl+C 关闭窗口)
                cv2.imshow("ROI + Contour (Compressed)", roi)
                cv2.imshow("Mask (Compressed)", mask)
                cv2.waitKey(1)  # 必须加,否则图像窗口会卡死

            else:
                # 未检测到轨迹时:停止前进,缓慢旋转寻找轨迹
                self.get_logger().warn("No line detected! Rotating to find line...")
                twist = Twist()
                twist.linear.x = 0.0
                twist.angular.z = 0.3  # 顺时针旋转(可改为 -0.3 逆时针)
                self.cmd_vel_pub.publish(twist)

                # 关闭图像窗口(避免无图像时窗口残留)
                cv2.destroyAllWindows()

        except Exception as e:
            self.get_logger().error(f"Error processing compressed image: {str(e)}")
            cv2.destroyAllWindows()

    def destroy_node(self):
        """节点销毁时:停止小车并关闭图像窗口"""
        twist = Twist()  # 修复:创建正确的 Twist 实例
        self.cmd_vel_pub.publish(twist)
        cv2.destroyAllWindows()
        super().destroy_node()

def main(args=None):
    rclpy.init(args=args)
    node = LineFollowerNode()
    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        node.get_logger().info("Node stopped by user (Ctrl+C)")
    finally:
        node.destroy_node()
        rclpy.shutdown()

if __name__ == "__main__":
    main()

#开源新春集福 #地瓜机器人 #OpenLoong开源社区 #AtomGit

 

 

 

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐