# 一、构建包

进入并创建包:

cd ~/ros2_ws/src
ros2 pkg create --build-type ament_python vision_node

这会生成一个包结构:

vision_node/
 ├── package.xml
 ├── setup.cfg
 ├── setup.py
 └── vision_node/
     └── __init__.py

# 二 、安装依赖

节点要用到:

  • rclpy(ROS 2 Python 客户端)
  • cv_bridge(ROS 图像格式转换)
  • opencv-python(图像处理
sudo apt install ros-foxy-cv-bridge
pip3 install opencv-python

# 三、编写主节点文件

cd ~/ros2_ws/src/vision_node/vision_node
touch vision_node.py
chmod +x vision_node.py
# vision_node模板:
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Imu, Image
from nav_msgs.msg import Odometry
from geometry_msgs.msg import Twist
from cv_bridge import CvBridge
import cv2

class VisionNode(Node):
    def __init__(self):
        super().__init__('vision_node')
        self.get_logger().info("VisionNode started!")

        # 初始化 OpenCV 视频流(摄像头或视频文件)
        # - 视频文件路径(如 "videos/test.mp4")
        # - 图片路径(如 "images/frame1.jpg")
        #self.cap = cv2.VideoCapture(0)  # 改成路径例如 'video.mp4' 读取文件
        self.cap = cv2.VideoCapture("/home/zhangjingxuan/cvtest/src/vision_node/1.mp4")

        # 创建 ROS 订阅者
        self.create_subscription(Imu, '/imu', self.imu_callback, 10)
        self.create_subscription(Odometry, '/odom', self.odom_callback, 10)

        # 创建 ROS 发布者
        self.cmd_pub = self.create_publisher(Twist, '/cmd_vel', 10)

        # 创建图像桥
        self.bridge = CvBridge()

        # 创建定时器(30Hz)
        self.timer = self.create_timer(1/30.0, self.timer_callback)

    def imu_callback(self, msg: Imu):
        self.get_logger().info_once("IMU 订阅成功")
        ax = msg.linear_acceleration.x
        ay = msg.linear_acceleration.y
        az = msg.linear_acceleration.z
        self.get_logger().debug(f"IMU Accel: {ax:.2f}, {ay:.2f}, {az:.2f}")

    def odom_callback(self, msg: Odometry):
        self.get_logger().info_once("Odom 订阅成功")
        pos = msg.pose.pose.position
        self.get_logger().debug(f"Odom Pos: x={pos.x:.2f}, y={pos.y:.2f}")

    def timer_callback(self):
        ret, frame = self.cap.read()
        #frame = cv2.imread(self.input_source)
        if not ret:
            self.get_logger().warn("读取视频失败或结束")
            return

        # 简单显示视频帧
        cv2.imshow("VisionNode View", frame)
        cv2.waitKey(1)

        # 示例:发布简单控制命令
        twist = Twist()
        twist.linear.x = 0.1
        twist.angular.z = 0.0
        self.cmd_pub.publish(twist)

    def destroy_node(self):
        self.get_logger().info("Shutting down VisionNode...")
        self.cap.release()
        cv2.destroyAllWindows()
        super().destroy_node()


def main(args=None):
    rclpy.init(args=args)
    node = VisionNode()
    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    finally:
        node.destroy_node()
        rclpy.shutdown()

if __name__ == '__main__':
    main()

# setup.py模板:
from setuptools import find_packages, setup

package_name = 'vision_node'

setup(
    name=package_name,
    version='0.0.0',
    packages=find_packages(exclude=['test']),
    data_files=[
        ('share/ament_index/resource_index/packages',
            ['resource/' + package_name]),
        ('share/' + package_name, ['package.xml']),
    ],
    install_requires=['setuptools'],
    zip_safe=True,
    maintainer='zhangjingxuan',
    maintainer_email='1607782513@qq.com',
    description='i love u',
    license='MIT',
    tests_require=['pytest'],
    entry_points={
        'console_scripts': [
        'vision_node = vision_node.vision_node:main'
        ],
    },
)