# 一、构建包
进入并创建包:
cd ~/ros2_ws/src
ros2 pkg create --build-type ament_python vision_node
这会生成一个包结构:
vision_node/
├── package.xml
├── setup.cfg
├── setup.py
└── vision_node/
└── __init__.py
# 二 、安装依赖
节点要用到:
rclpy(ROS 2 Python 客户端)cv_bridge(ROS 图像格式转换)opencv-python(图像处理
sudo apt install ros-foxy-cv-bridge
pip3 install opencv-python
# 三、编写主节点文件
cd ~/ros2_ws/src/vision_node/vision_node
touch vision_node.py
chmod +x vision_node.py
# vision_node模板:
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Imu, Image
from nav_msgs.msg import Odometry
from geometry_msgs.msg import Twist
from cv_bridge import CvBridge
import cv2
class VisionNode(Node):
def __init__(self):
super().__init__('vision_node')
self.get_logger().info("VisionNode started!")
# 初始化 OpenCV 视频流(摄像头或视频文件)
# - 视频文件路径(如 "videos/test.mp4")
# - 图片路径(如 "images/frame1.jpg")
#self.cap = cv2.VideoCapture(0) # 改成路径例如 'video.mp4' 读取文件
self.cap = cv2.VideoCapture("/home/zhangjingxuan/cvtest/src/vision_node/1.mp4")
# 创建 ROS 订阅者
self.create_subscription(Imu, '/imu', self.imu_callback, 10)
self.create_subscription(Odometry, '/odom', self.odom_callback, 10)
# 创建 ROS 发布者
self.cmd_pub = self.create_publisher(Twist, '/cmd_vel', 10)
# 创建图像桥
self.bridge = CvBridge()
# 创建定时器(30Hz)
self.timer = self.create_timer(1/30.0, self.timer_callback)
def imu_callback(self, msg: Imu):
self.get_logger().info_once("IMU 订阅成功")
ax = msg.linear_acceleration.x
ay = msg.linear_acceleration.y
az = msg.linear_acceleration.z
self.get_logger().debug(f"IMU Accel: {ax:.2f}, {ay:.2f}, {az:.2f}")
def odom_callback(self, msg: Odometry):
self.get_logger().info_once("Odom 订阅成功")
pos = msg.pose.pose.position
self.get_logger().debug(f"Odom Pos: x={pos.x:.2f}, y={pos.y:.2f}")
def timer_callback(self):
ret, frame = self.cap.read()
#frame = cv2.imread(self.input_source)
if not ret:
self.get_logger().warn("读取视频失败或结束")
return
# 简单显示视频帧
cv2.imshow("VisionNode View", frame)
cv2.waitKey(1)
# 示例:发布简单控制命令
twist = Twist()
twist.linear.x = 0.1
twist.angular.z = 0.0
self.cmd_pub.publish(twist)
def destroy_node(self):
self.get_logger().info("Shutting down VisionNode...")
self.cap.release()
cv2.destroyAllWindows()
super().destroy_node()
def main(args=None):
rclpy.init(args=args)
node = VisionNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
# setup.py模板:
from setuptools import find_packages, setup
package_name = 'vision_node'
setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='zhangjingxuan',
maintainer_email='1607782513@qq.com',
description='i love u',
license='MIT',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'vision_node = vision_node.vision_node:main'
],
},
)

