MiniVLA-(02):基于 Qwen2.5-VL 的端到端自动驾驶轨迹预测 Demo
修复后的代码
问题1
修复了。问题原因:Qwen2.5-VL 是多模态模型,它的 config 是嵌套结构,hidden_size 不在顶层,而在 config.text_config.hidden_size 里。
修复逻辑是三级查找:先试 config.hidden_size(兼容标准模型),再试 config.text_config.hidden_size(Qwen2.5-VL 的实际位置),最后兜底用 2048。
下边是优化后的代码:
"""
╔══════════════════════════════════════════════════════════════════════════╗
║ MiniVLA:基于 Qwen2.5-VL 的端到端自动驾驶轨迹预测 ║
║ Vision-Language-Action (VLA) 教学示例 ║
║ ║
║ 功能:用视觉语言模型理解道路场景 + 驾驶指令 → 预测未来行驶轨迹 ║
║ 适配:8GB 显卡(RTX 5060 / RTX 3060 / RTX 4060 等) ║
║ ║
║ 核心修复(相对原版): ║
║ 1. CPU offload 策略:VLM 放 CPU,轨迹头放 GPU,解决 8GB 显存 OOM ║
║ 无需 bitsandbytes,无 CUDA 版本依赖,任何 8GB 显卡都能跑 ║
║ 2. 动态获取 hidden_size,不再硬编码错误维度 ║
║ 3. 修复轨迹加载 bug(原版 token 查找逻辑错误导致轨迹全零) ║
║ 4. 修复指令生成逻辑(改用相邻帧航向差而非绝对航向角) ║
║ 5. 添加速度估算(基于相邻帧位移,不再硬编码 5.0) ║
║ 6. 添加完整推理演示函数 ║
╚══════════════════════════════════════════════════════════════════════════╝
"""
import os
# =====================================================================
# 【第零步:环境变量 —— 必须在所有 import 之前设置】
#
# 为什么要放最前面?
# 因为 transformers 库在 import 时就会读取这些环境变量来决定
# 从哪个服务器下载模型。如果放在 import 之后,就来不及了。
# =====================================================================
os.environ['HF_ENDPOINT'] = 'https://hf-mirror.com' # 国内镜像,解决 HuggingFace 下载慢/断连
os.environ["TRANSFORMERS_OFFLINE"] = "0" # 允许在线下载(设为 "1" 则纯离线模式)
os.environ['HUGGINGFACE_HUB_CACHE'] = './model_cache' # 模型缓存目录,避免重复下载
os.environ['TRANSFORMERS_CACHE'] = './model_cache' # 同上,兼容旧版 transformers
os.environ['HF_HUB_DOWNLOAD_TIMEOUT'] = '600' # 下载超时 10 分钟,防止大模型下载中断
# =====================================================================
# 【第一步:导入依赖库】
# =====================================================================
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from transformers import (
Qwen2_5_VLForConditionalGeneration,
AutoProcessor,
)
from nuscenes import NuScenes
from pyquaternion import Quaternion
import numpy as np
from PIL import Image
from tqdm import tqdm
import warnings
warnings.filterwarnings("ignore")
# =====================================================================
# 【第二步:数据集类 —— 从 nuScenes 加载训练数据】
#
# 核心职责:
# 给定一个索引 idx,返回 (图像, 驾驶指令, 车速, 真实轨迹)
# 这四样东西构成一条完整的训练样本
#
# 数据流:
# nuScenes 数据集(真实道路采集)
# ↓
# CAM_FRONT 前视摄像头图片 → 模型的"眼睛"
# ego_pose 自车位姿 → 提取指令/速度/轨迹
# =====================================================================
class NuScenesTrajDataset(Dataset):
"""
nuScenes 驾驶数据集封装
每条样本包含:
- image: 前视摄像头 RGB 图像(PIL Image)
- command: 驾驶指令字符串("drive straight" / "turn left" / "turn right")
- speed: 自车速度 (m/s)
- traj: 未来轨迹,形状 (seq_len, 2),单位:米,相对于当前位置
"""
def __init__(self, nusc_root, nusc_version='v1.0-mini', seq_len=6, max_samples=100):
"""
参数:
nusc_root: nuScenes 数据集根目录
nusc_version: 数据集版本('v1.0-mini' 只有 10 个场景,适合调试)
seq_len: 预测未来多少个轨迹点(默认 6 个,约 3 秒)
max_samples: 最多加载多少条数据(调试用,正式训练可设为 None)
"""
# ------- 加载 nuScenes 元数据(索引表,不是图片本身) -------
self.nusc = NuScenes(version=nusc_version, dataroot=nusc_root, verbose=True)
self.seq_len = seq_len
# ------- 构建 token 列表和快速查找集合 -------
# token 是 nuScenes 中每条数据的唯一 ID(32位十六进制字符串)
# 类似数据库的主键,所有数据通过 token 互相关联
self.sample_tokens = [s['token'] for s in self.nusc.sample]
# 【修复】构建 token 集合,用于 O(1) 快速查找
# 原版错误:用 `token in self.nusc.sample`(在 list of dict 中查字符串,永远 False)
self.token_set = set(self.sample_tokens)
# 【修复】构建 token → sample 字典,避免重复调用 nusc.get()
self.token_to_sample = {s['token']: s for s in self.nusc.sample}
# 限制数据量(调试用)
if max_samples is not None:
self.sample_tokens = self.sample_tokens[:max_samples]
print(f"✅ 数据集加载完成,共 {len(self.sample_tokens)} 条样本")
def __len__(self):
"""DataLoader 需要知道数据集总共有多少条"""
return len(self.sample_tokens)
def _get_ego_pose(self, sample_token):
"""
辅助方法:根据 sample token 获取自车位姿
返回:
ego_pose dict,包含:
- translation: [x, y, z] 全局坐标(米)
- rotation: [w, x, y, z] 四元数表示的朝向
什么是四元数(Quaternion)?
三维空间中表示旋转的方式之一。相比欧拉角(yaw/pitch/roll),
四元数没有万向节死锁(Gimbal Lock)问题,数学运算也更稳定。
你可以简单理解为:它用 4 个数字编码了"物体面朝哪个方向"。
"""
sample = self.token_to_sample[sample_token]
cam_data = self.nusc.get('sample_data', sample['data']['CAM_FRONT'])
ego_pose = self.nusc.get('ego_pose', cam_data['ego_pose_token'])
return ego_pose
def __getitem__(self, idx):
"""
核心方法:返回第 idx 条训练样本
PyTorch 的 DataLoader 会反复调用这个方法:
- 训练时,DataLoader 自动处理 batch、shuffle、多进程加载
- 每次调用返回一条样本,DataLoader 把多条样本拼成一个 batch
"""
sample_token = self.sample_tokens[idx]
sample = self.token_to_sample[sample_token]
# ================================================================
# 2.1 加载前视摄像头图像
# ================================================================
# nuScenes 数据组织方式:
# sample(一个时间戳的完整数据)
# └── sample_data(某个传感器在该时间戳的数据)
# └── filename(图片文件相对路径)
#
# CAM_FRONT 是前视摄像头,分辨率 1600×900
cam_data = self.nusc.get('sample_data', sample['data']['CAM_FRONT'])
cam_path = os.path.join(self.nusc.dataroot, cam_data['filename'])
image = Image.open(cam_path).convert('RGB')
# ================================================================
# 2.2 生成驾驶指令(直行 / 左转 / 右转)
# ================================================================
# 【修复】原版用当前帧的绝对 yaw 角判断指令,这是错误的——
# 绝对 yaw 表示车头在全局坐标系中的朝向,跟"当前是否在转弯"无关。
# 正确做法:用相邻帧的 yaw 差值(航向变化率)来判断。
#
# 举例:车头一直朝东(yaw≈0)在直行 → 原版判断为直行 ✓
# 车头一直朝北(yaw≈π/2)在直行 → 原版判断为左转 ✗
# 用 yaw 差值的话,两种情况差值都≈0,正确判断为直行 ✓
current_pose = self._get_ego_pose(sample_token)
current_yaw = Quaternion(current_pose['rotation']).yaw_pitch_roll[0]
# 检查下一帧是否存在
next_token = sample.get('next', '')
if next_token and next_token in self.token_set:
next_pose = self._get_ego_pose(next_token)
next_yaw = Quaternion(next_pose['rotation']).yaw_pitch_roll[0]
# 计算航向变化量(弧度)
# yaw_diff > 0 表示逆时针旋转(左转)
# yaw_diff < 0 表示顺时针旋转(右转)
yaw_diff = next_yaw - current_yaw
# 处理角度跨越 ±π 的情况(比如从 179° 转到 -179° 实际只转了 2°)
if yaw_diff > np.pi:
yaw_diff -= 2 * np.pi
elif yaw_diff < -np.pi:
yaw_diff += 2 * np.pi
if yaw_diff > 0.05:
command = "turn left"
elif yaw_diff < -0.05:
command = "turn right"
else:
command = "drive straight"
else:
command = "drive straight" # 最后一帧默认直行
# ================================================================
# 2.3 估算自车速度
# ================================================================
# 【修复】原版硬编码 speed = 5.0,这里用相邻帧的位移估算
#
# 速度 = 距离 / 时间
# nuScenes 的采样间隔约 0.5 秒(2Hz)
if next_token and next_token in self.token_set:
next_pos = np.array(next_pose['translation'][:2])
curr_pos = np.array(current_pose['translation'][:2])
distance = np.linalg.norm(next_pos - curr_pos) # 欧几里得距离
dt = 0.5 # nuScenes 采样间隔约 0.5 秒
speed = distance / dt
else:
speed = 0.0
# ================================================================
# 2.4 加载未来真实轨迹(标签 / Ground Truth)
# ================================================================
#
# 这是"正确答案"——人类司机实际走过的路线。
# 训练时,模型的预测轨迹会和这个真实轨迹对比,算出误差(loss)。
#
# 数据结构:(seq_len, 2) = (6, 2)
# 6 个未来时间步,每步一个 (x, y) 坐标
# 单位:米,相对于当前车辆位置
#
# nuScenes 的 sample 通过 'next' 字段形成链表:
# sample_0 --next--> sample_1 --next--> sample_2 --next--> ...
# 每个 sample 间隔约 0.5 秒,6 个点覆盖约 3 秒的未来
traj = np.zeros((self.seq_len, 2), dtype=np.float32)
walk_token = sample_token
for i in range(self.seq_len):
# 沿链表往后走一步
if walk_token not in self.token_set:
break # 到达数据集末尾,剩余轨迹点保持为零
walk_sample = self.token_to_sample[walk_token]
next_walk = walk_sample.get('next', '')
if next_walk and next_walk in self.token_set:
future_pose = self._get_ego_pose(next_walk)
traj[i] = [
future_pose['translation'][0],
future_pose['translation'][1]
]
walk_token = next_walk
else:
# 没有下一帧了,用当前位置填充(轨迹不再延伸)
if i > 0:
traj[i] = traj[i - 1]
break
# ------- 轨迹归一化:全局坐标 → 相对坐标 -------
#
# 为什么要归一化?
# 全局坐标可能是 (1035.2, 567.8) 这样的大数
# 但模型不需要知道"我在地球上的哪个位置"
# 它只需要知道"接下来往哪走"(相对位移)
#
# 做法:每个轨迹点减去当前位置
# (1036.4, 567.9) - (1035.2, 567.8) = (1.2, 0.1)
# 含义:往前 1.2 米,往右偏 0.1 米
current_xy = np.array(current_pose['translation'][:2], dtype=np.float32)
traj = traj - current_xy[None, :] # None 增加一个维度用于广播
return image, command, np.float32(speed), torch.from_numpy(traj)
# =====================================================================
# 【第三步:模型定义 —— MiniVLA】
#
# 架构(CPU offload 策略):
# ┌─────────────────────────────────────────────────┐
# │ Qwen2.5-VL(冻结,放在 CPU 上) │
# │ ↓ 在 CPU 上理解图像 + 文字指令 │
# │ ↓ 输出 hidden_size 维的特征向量 │
# ├─────────────────────────────────────────────────┤
# │ 轨迹预测头 traj_head(放在 GPU 上,可训练) │
# │ ↓ hidden_size → 512 → 256 → 12 │
# │ ↓ 12 个数字 = 6 个轨迹点 × 2 个坐标 (x, y) │
# └─────────────────────────────────────────────────┘
#
# 为什么用 CPU offload 而不是 4-bit 量化?
# 4-bit 量化需要 bitsandbytes 库,它依赖特定版本的 CUDA Toolkit
# (必须安装与 PyTorch 编译版本匹配的 CUDA,否则会报
# "Missing dependency: libnvJitLink.so" 错误)
# CPU offload 零额外依赖,任何环境都能跑,只是推理慢一些
#
# 速度对比(100 条数据,20 epoch):
# - 4-bit GPU:约 30 分钟
# - CPU offload:约 2-3 小时
# 对教学场景来说完全可接受
#
# 为什么要冻结 VLM?
# 1. 省显存/内存:不需要存储 VLM 参数的梯度
# 2. 防遗忘:VLM 在海量数据上学到的视觉理解能力不会被破坏
# 3. 收敛快:只训练几千个参数的轨迹头,比训练几十亿参数快得多
# =====================================================================
class MiniVLA(nn.Module):
def __init__(self, vlm_name='Qwen/Qwen2.5-VL-3B-Instruct',
num_waypoints=6, freeze_vlm=True):
"""
参数:
vlm_name: HuggingFace 上的模型名称
num_waypoints: 预测的轨迹点数量
freeze_vlm: 是否冻结 VLM 参数
"""
super().__init__()
print(f"✅ 加载模型: {vlm_name}")
print(f" 策略: CPU offload(VLM 在 CPU,轨迹头在 GPU)")
# ---------------------------------------------------------------
# 3.1 加载预训练 VLM 到 CPU
# ---------------------------------------------------------------
# torch_dtype=torch.float16:半精度浮点数,每个参数占 2 字节
# 3B × 2 = 6GB 内存(在 CPU 内存中,不占 GPU 显存)
#
# 为什么放 CPU?
# 8GB 显存装不下 6GB 的模型 + PyTorch 自身开销
# 但 CPU 内存通常有 16-64GB,轻松容纳
# 代价:推理速度慢约 5-10 倍(但我们只训练轨迹头,可以接受)
self.vlm = Qwen2_5_VLForConditionalGeneration.from_pretrained(
vlm_name,
torch_dtype=torch.float16,
trust_remote_code=True,
)
# 明确放到 CPU(from_pretrained 默认就在 CPU,但显式声明更清晰)
self.vlm = self.vlm.to("cpu")
# 加载对应的数据预处理器(Processor)
# Processor 负责:
# - 图片:缩放、归一化像素值到 [0,1]
# - 文字:分词(tokenize),把自然语言变成数字 ID 序列
# - 对齐:把图片 token 和文字 token 拼在一起
self.processor = AutoProcessor.from_pretrained(
vlm_name, trust_remote_code=True
)
# ---------------------------------------------------------------
# 3.2 冻结 VLM 参数
# ---------------------------------------------------------------
if freeze_vlm:
for p in self.vlm.parameters():
p.requires_grad = False
self.vlm.eval() # 设为推理模式(关闭 Dropout 等)
print(" VLM 参数已冻结,只训练轨迹头")
# ---------------------------------------------------------------
# 3.3 构建轨迹预测头(这是唯一需要训练的部分)
# ---------------------------------------------------------------
# 【修复】动态获取 hidden_size,而不是硬编码
#
# 原版的 bug:
# 代码注释写着 "3B = 2048",但实际设置的是 hidden = 3584(7B 的维度)
# 加载 3B 模型 + 3584 维度 = 维度不匹配,运行时报错
#
# 正确做法:从模型配置中自动读取
# - Qwen2.5-VL-3B: hidden_size = 2048
# - Qwen2.5-VL-7B: hidden_size = 3584
#
# 注意:Qwen2.5-VL 是多模态模型,它的 config 结构是嵌套的:
# config
# ├── text_config (语言模型配置,hidden_size 在这里)
# └── vision_config (视觉编码器配置)
# 直接访问 config.hidden_size 会报 AttributeError
# 需要通过 config.text_config.hidden_size 获取
if hasattr(self.vlm.config, 'hidden_size'):
hidden = self.vlm.config.hidden_size
elif hasattr(self.vlm.config, 'text_config'):
hidden = self.vlm.config.text_config.hidden_size
else:
# 兜底:手动指定(Qwen2.5-VL-3B = 2048)
hidden = 2048
print(f" ⚠️ 无法自动获取 hidden_size,使用默认值 {hidden}")
print(f" VLM hidden_size = {hidden}")
# MLP(多层感知机)结构:
# hidden → 512:降维,把高维特征压缩
# ReLU:激活函数,引入非线性(没有非线性的话,多层线性等价于一层)
# Dropout(0.1):训练时随机丢弃 10% 的神经元,防止过拟合
# 512 → 256:进一步降维
# 256 → num_waypoints*2:输出层,6个点×2个坐标 = 12 个数字
self.traj_head = nn.Sequential(
nn.Linear(hidden, 512),
nn.ReLU(),
nn.Dropout(0.1),
nn.Linear(512, 256),
nn.ReLU(),
nn.Linear(256, num_waypoints * 2), # 输出 12 个数字
)
self.num_waypoints = num_waypoints
# 记录设备信息(用于 forward 中的跨设备操作)
self._traj_device = None # 在 train() 中设置
# 统计可训练参数量
trainable = sum(p.numel() for p in self.traj_head.parameters())
total_vlm = sum(p.numel() for p in self.vlm.parameters())
print(f" VLM 参数: {total_vlm:,}(冻结,在 CPU)")
print(f" 轨迹头参数: {trainable:,}(可训练,在 GPU)")
def forward(self, images, driving_commands, ego_speeds):
"""
前向传播:图像 + 指令 → 预测轨迹
参数:
images: PIL Image 列表,长度 = batch_size
driving_commands: 字符串列表,如 ["drive straight", "turn left"]
ego_speeds: 速度数组,形状 (batch_size,)
返回:
pred_traj: 预测轨迹,形状 (batch_size, 6, 2)
数据流(跨设备):
图片 + 文字 → Processor → token IDs (CPU)
token IDs → VLM (CPU) → 隐藏状态 (CPU)
最后一层最后一个 token → .to(GPU) → 轨迹头 (GPU) → 预测轨迹 (GPU)
"""
# ---- 构建文本 prompt ----
prompts = [
f'Command: {cmd}. Speed: {sp:.1f}m/s.'
for cmd, sp in zip(driving_commands, ego_speeds)
]
# ---- 数据预处理(在 CPU 上) ----
inputs = self.processor(
text=prompts,
images=images,
return_tensors='pt',
padding=True,
).to("cpu") # VLM 在 CPU,所以 inputs 也要在 CPU
# ---- VLM 前向推理(在 CPU 上,不计算梯度) ----
with torch.no_grad():
out = self.vlm(**inputs, output_hidden_states=True)
# ---- 提取特征向量并搬到 GPU ----
# out.hidden_states[-1]: 最后一层的输出
# [:, -1, :]: 最后一个 token 的特征
# .float(): fp16 → fp32(轨迹头用 fp32 更稳定)
# .to(traj_device): 从 CPU 搬到 GPU
h = out.hidden_states[-1][:, -1, :].float()
# 搬到轨迹头所在的设备(GPU)
traj_device = next(self.traj_head.parameters()).device
h = h.to(traj_device)
# ---- 轨迹预测(在 GPU 上) ----
traj = self.traj_head(h)
return traj.view(-1, self.num_waypoints, 2)
# =====================================================================
# 【第四步:训练函数】
#
# 深度学习训练的标准流程:
# 1. 准备数据(Dataset + DataLoader)
# 2. 构建模型
# 3. 定义损失函数(衡量"错了多少")
# 4. 定义优化器(决定"怎么改参数")
# 5. 循环训练:前向 → 算损失 → 反向传播 → 更新参数
# =====================================================================
def train():
"""完整的训练流程"""
# ======================== 配置参数 ========================
# 【修改这里】把路径改成你自己的 nuScenes 数据集路径
NUSCENES_ROOT = "/home/lionsking/data/nuscenes"
BATCH_SIZE = 1 # 每次送 1 张图给模型(8GB 显存只能 batch=1)
EPOCHS = 20 # 完整遍历数据集的次数
LR = 1e-3 # 学习率:每次更新参数时的步长
MAX_SAMPLES = 100 # 只用前 100 条数据(调试用,正式训练可增大)
# 自动选择设备
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print(f"\n{'='*60}")
print(f"设备: {DEVICE}")
if DEVICE == "cuda":
gpu_name = torch.cuda.get_device_name(0)
gpu_mem = torch.cuda.get_device_properties(0).total_memory / 1024**3
print(f"GPU: {gpu_name} ({gpu_mem:.1f} GB)")
print(f"{'='*60}\n")
# ======================== 加载数据集 ========================
print("📦 加载 nuScenes 数据集...")
dataset = NuScenesTrajDataset(
nusc_root=NUSCENES_ROOT,
max_samples=MAX_SAMPLES,
)
# DataLoader:自动处理 batch、shuffle、多进程加载
# shuffle=True:每个 epoch 打乱数据顺序(防止模型记住顺序而非学习规律)
# num_workers=0:数据加载的子进程数(0 = 主进程加载,最稳定)
# pin_memory=True:锁页内存,加速 CPU→GPU 数据传输
# collate_fn:自定义 batch 组装方式(因为 PIL Image 不能直接 stack)
dataloader = DataLoader(
dataset,
batch_size=BATCH_SIZE,
shuffle=True,
num_workers=0, # 避免多进程和 nuScenes 的冲突
pin_memory=(DEVICE == "cuda"),
collate_fn=custom_collate, # 自定义 batch 拼接(见下方定义)
)
# ======================== 构建模型 ========================
print("\n🧠 构建 MiniVLA 模型...")
model = MiniVLA(freeze_vlm=True)
# VLM 已经在 CPU 上了(在 __init__ 中设置)
# 只把轨迹头搬到 GPU(它很小,只占几 MB 显存)
model.traj_head = model.traj_head.to(DEVICE)
print(f" VLM → CPU | 轨迹头 → {DEVICE}")
# ======================== 损失函数和优化器 ========================
# SmoothL1Loss(也叫 Huber Loss):
# - 当预测和真实差距大时,用 L1(|pred - true|),梯度恒定,不会爆炸
# - 当差距小时,用 L2((pred - true)²),更平滑,有利于精细调整
# - 对轨迹预测这种回归任务来说是最佳选择之一
loss_fn = nn.SmoothL1Loss()
# Adam 优化器:只优化轨迹头的参数
# Adam = Adaptive Moment Estimation
# 它为每个参数自适应调整学习率:
# - 变化大的参数:自动降低学习率(避免震荡)
# - 变化小的参数:自动提高学习率(加速收敛)
optimizer = torch.optim.Adam(model.traj_head.parameters(), lr=LR)
# ======================== 训练循环 ========================
print(f"\n🚀 开始训练({EPOCHS} 个 epoch,每 epoch {len(dataloader)} 个 batch)...\n")
model.traj_head.train() # 轨迹头设为训练模式(启用 Dropout)
best_loss = float('inf')
for epoch in range(EPOCHS):
total_loss = 0.0
pbar = tqdm(dataloader, desc=f"Epoch {epoch+1}/{EPOCHS}")
for batch_idx, (images, commands, speeds, gt_traj) in enumerate(pbar):
# gt_traj: 真实轨迹 (Ground Truth),形状 (batch, 6, 2)
gt_traj = gt_traj.to(DEVICE)
# ---- 前向传播:图像+指令 → 预测轨迹 ----
pred_traj = model(images, commands, speeds)
# ---- 计算损失:预测轨迹 vs 真实轨迹 ----
loss = loss_fn(pred_traj, gt_traj)
# ---- 反向传播三步曲 ----
optimizer.zero_grad() # 1. 清空旧梯度(PyTorch 默认累加梯度)
loss.backward() # 2. 计算每个参数对 loss 的贡献(梯度)
optimizer.step() # 3. 按梯度方向更新参数(让 loss 变小)
total_loss += loss.item()
pbar.set_postfix(loss=f"{loss.item():.4f}")
# 本 epoch 的平均损失
avg_loss = total_loss / len(dataloader)
print(f" Epoch {epoch+1} 平均损失: {avg_loss:.4f}")
# 保存最优模型
if avg_loss < best_loss:
best_loss = avg_loss
torch.save(model.traj_head.state_dict(), 'vla_traj_head_best.pth')
print(f" 💾 最优模型已保存 (loss={best_loss:.4f})")
# 保存最终模型
torch.save(model.traj_head.state_dict(), 'vla_traj_head_final.pth')
print(f"\n✅ 训练完成!最终模型: vla_traj_head_final.pth")
print(f" 最佳模型: vla_traj_head_best.pth (loss={best_loss:.4f})")
def custom_collate(batch):
"""
自定义 batch 组装函数
为什么需要这个?
默认的 collate_fn 会尝试把所有数据 stack 成 tensor,
但 PIL Image 不能直接 stack(大小可能不同、类型不是 tensor)。
所以我们手动处理:
- 图像:保持为 PIL Image 列表(Processor 会统一处理)
- 指令:保持为字符串列表
- 速度和轨迹:正常 stack 成 tensor
"""
images, commands, speeds, trajs = zip(*batch)
return (
list(images), # PIL Image 列表
list(commands), # 字符串列表
torch.tensor(speeds), # (batch,)
torch.stack(trajs), # (batch, 6, 2)
)
# =====================================================================
# 【第五步:推理演示 —— 用训练好的模型预测轨迹】
# =====================================================================
def demo_inference():
"""
加载训练好的模型,对单张图片做轨迹预测(演示用)
"""
NUSCENES_ROOT = "/home/lionsking/data/nuscenes"
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
# 加载模型
model = MiniVLA(freeze_vlm=True)
model.traj_head = model.traj_head.to(DEVICE)
# 加载训练好的轨迹头权重
model.traj_head.load_state_dict(
torch.load('vla_traj_head_best.pth', map_location=DEVICE)
)
model.traj_head.eval() # 设为推理模式(关闭 Dropout)
# 加载一张测试图片
dataset = NuScenesTrajDataset(nusc_root=NUSCENES_ROOT, max_samples=10)
image, command, speed, gt_traj = dataset[0]
print(f"\n指令: {command}")
print(f"速度: {speed:.1f} m/s")
# 预测
with torch.no_grad():
pred = model([image], [command], [speed])
pred = pred.cpu().numpy()[0] # (6, 2)
print(f"\n预测轨迹(相对坐标,单位:米):")
for i, (x, y) in enumerate(pred):
print(f" 第 {i+1} 个点: 前方 {x:.2f}m, 侧向 {y:.2f}m")
gt = gt_traj.numpy()
print(f"\n真实轨迹:")
for i, (x, y) in enumerate(gt):
print(f" 第 {i+1} 个点: 前方 {x:.2f}m, 侧向 {y:.2f}m")
# =====================================================================
# 【入口】
# =====================================================================
if __name__ == "__main__":
import sys
if len(sys.argv) > 1 and sys.argv[1] == "demo":
# 推理演示:python mini_vla_nuscenes_fixed.py demo
demo_inference()
else:
# 默认:训练
train()
为者常成,行者常至
自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)