从零到端到端自动驾驶实战-第三章-CARLA 仿真与模仿学习

一、启动Carla环境

carla 安装,请参考之前的博文;

(base) lionsking@ai-dev:~$ conda env list

# conda environments:
#
# * -> active
# + -> frozen
base                 *   /home/lionsking/miniconda3
bevformer                /home/lionsking/miniconda3/envs/bevformer
carla                    /home/lionsking/miniconda3/envs/carla
uniad2.0                 /home/lionsking/miniconda3/envs/uniad2.0

(base) lionsking@ai-dev:~$ conda activate carla
(carla) lionsking@ai-dev:~$ cd /home/lionsking/carla
(carla) lionsking@ai-dev:~/carla$ ./CarlaUE4.sh -windowed -ResX=1280 -ResY=720 -quality-level=Epic
4.26.2-0+++UE4+Release-4.26 522 0
Disabling core dumps.

file

你的第一个CARLA脚本:生成一辆车并让它自动开
01_spawn_vehicle.py

import carla, random, time

# 连接到CARLA服务端
client = carla.Client('localhost', 2000)
client.set_timeout(10.0)
world = client.get_world()

#从蓝图库中找到Tesla
bp_lib = world.get_blueprint_library()
vehicle_bp = bp_lib.filter('vehicle.tesla.model3')[0]

#随机选一个出生点
spawn_point=random.choice(world.get_map().get_spawn_points())
#生成车辆
vehicle=world.spawn_actor(vehicle_bp,spawn_point)
print(f'✅车辆已生成: {vehicle.type_id}')
#开启自动驾驶(CARLA内置的规则引擎,不是神经网络)
vehicle.set_autopilot(True)
print('✅Autopilot已开启,车辆正在自动行驶...')
#观察30秒
time.sleep(30)
#清理(必须!否则车会留在场景里)
vehicle.destroy()
print('✅车辆已销毁')

代码执行结果:

(carla) lionsking@ai-dev:~/Code/auto_self/carla_ch03$ python 01_spawn_vehicle.py
✅车辆已生成: vehicle.tesla.model3
✅Autopilot已开启,车辆正在自动行驶...
✅车辆已销毁

3.2 绑定摄像头 + 数据采集脚本

现在车能自己开了,但我们需要“看到”它看到的世界。下一步是给车辆安装一个前视摄像头,并收集“图像 + 方向盘/油门”的配对数据。

概念解释:模仿学习(Imitation Learning)
模仿学习的思路很简单:找一个“老司机”开给你看,记录他每一刻看到的画面和做的操作,然后训练神经网络学会“看到这个画面就做这个操作”。在 CARLA 中,“老司机”就是 CARLA 内置的 Autopilot(一个基于规则的自动驾驶引擎)。

完整的数据采集脚本:
02_collect_data.py

# 02_collect_data.py
# 采集 CARLA Autopilot 的驾驶数据:图像 + 方向盘 + 油门
import carla, random, time, os, csv
import numpy as np

# ===== 配置 =====
IMG_WIDTH, IMG_HEIGHT = 640, 480
SAVE_DIR = '/data/projects/carla_data'
NUM_FRAMES = 5000  # 采集 5000 帧
# sudo chmod -R 777 /data/projects/carla_data
os.makedirs(f'{SAVE_DIR}/images', exist_ok=True)

# ===== 连接 CARLA =====
client = carla.Client('localhost', 2000)
client.set_timeout(10.0)
world = client.get_world()

# 开启同步模式(确保图像和控制信号帧对齐)
settings = world.get_settings()
settings.synchronous_mode = True
settings.fixed_delta_seconds = 0.05  # 20 FPS
world.apply_settings(settings)

# ===== 生成车辆 =====
bp_lib = world.get_blueprint_library()
vehicle_bp = bp_lib.filter('vehicle.tesla.model3')[0]
spawn_point = random.choice(world.get_map().get_spawn_points())
vehicle = world.spawn_actor(vehicle_bp, spawn_point)
vehicle.set_autopilot(True)

# ===== 安装前视摄像头 =====
camera_bp = bp_lib.find('sensor.camera.rgb')
camera_bp.set_attribute('image_size_x', str(IMG_WIDTH))
camera_bp.set_attribute('image_size_y', str(IMG_HEIGHT))
camera_bp.set_attribute('fov', '110')  # 视场角 110°

# 摄像头位置:车前方 1.5米、高度 2.4米
camera_transform = carla.Transform(
    carla.Location(x=1.5, z=2.4),
    carla.Rotation(pitch=-15)  # 微微俯视
)
camera = world.spawn_actor(camera_bp, camera_transform,
                           attach_to=vehicle)

# ===== 数据采集回调 =====
frame_data = []  # 存储每一帧的数据
current_image = None

def camera_callback(image):
    global current_image
    # 把 CARLA 图像转为 numpy 数组
    array = np.frombuffer(image.raw_data, dtype=np.uint8)
    array = array.reshape((IMG_HEIGHT, IMG_WIDTH, 4))  # BGRA
    current_image = array[:, :, :3]  # 去掉 Alpha 通道

camera.listen(camera_callback)

# ===== 主采集循环 =====
csv_file = open(f'{SAVE_DIR}/labels.csv', 'w', newline='')
writer = csv.writer(csv_file)
writer.writerow(['frame', 'steer', 'throttle', 'brake', 'speed'])

print(f'开始采集 {NUM_FRAMES} 帧数据...')
for frame_id in range(NUM_FRAMES):
    world.tick()  # 同步模式下手动推进一帧

    if current_image is None:
        continue

    # 获取当前控制信号(Autopilot 的操作)
    control = vehicle.get_control()
    velocity = vehicle.get_velocity()
    speed = (velocity.x**2 + velocity.y**2 + velocity.z**2)**0.5

    # 保存图片
    import cv2
    img_path = f'{SAVE_DIR}/images/frame_{frame_id:06d}.jpg'
    cv2.imwrite(img_path, current_image)

    # 保存标签
    writer.writerow([
        frame_id,
        round(control.steer, 4),     # 方向盘 [-1, 1]
        round(control.throttle, 4),   # 油门 [0, 1]
        round(control.brake, 4),      # 刹车 [0, 1]
        round(speed, 2)               # 速度 m/s
            ])

    if frame_id % 500 == 0:
        print(f'  已采集 {frame_id}/{NUM_FRAMES} 帧')

csv_file.close()
camera.stop()
camera.destroy()
vehicle.destroy()
print(f'✅ 采集完成!数据保存在 {SAVE_DIR}')

执行结果:

(carla) lionsking@ai-dev:~/Code/auto_self/carla_ch03$ python 02_collect_data.py 
开始采集 5000 帧数据...
  已采集 500/5000 帧
  已采集 1000/5000 帧
  已采集 1500/5000 帧
  已采集 2000/5000 帧
  已采集 2500/5000 帧
  已采集 3000/5000 帧
  已采集 3500/5000 帧
  已采集 4000/5000 帧
  已采集 4500/5000 帧
✅ 采集完成!数据保存在 /data/projects/carla_data

file

️> 同步模式为什么重要?

异步模式下,CARLA 服务端和 Python 客户端以不同速率运行。你获取的图像和控制信号可能不是同一帧的!同步模式下,服务端会等客户端调用 world.tick() 后才推进一帧,确保数据帧对齐。这对于训练数据的质量至关重要。

3.3 第一个模仿学习模型:ResNet → steer/throttle

3.3.1 模型设计思路

我们的第一个模型非常简单:输入一张摄像头图片,输出方向盘角度和油门深度。用 ResNet-18 作为图像编码器(ImageNet 预训练),后接一个 MLP 头输出控制值。
03_model.py

# 03_model.py
import torch
import torch.nn as nn
import torchvision.models as models

class DrivingModel(nn.Module):
    """
    最简单的端到端驾驶模型:
    输入:RGB 图像 (3, 224, 224)
    输出:steer (-1~1) + throttle (0~1)
    """
    def __init__(self):
        super().__init__()
        # ResNet18:一个超强的图像识别神经网络,18 层。
        # 预训练(pretrained=True):它已经在 1400 万张图片上学过边缘、形状、物体、道路、车、人等所有视觉知识。
        # 用预训练的 ResNet-18 作为视觉编码器
        resnet = models.resnet18(pretrained=True)

        # 去掉最后的分类层,只保留特征提取部分
        # resnet.children():把 ResNet 所有层拆成一个列表, [:-1]:去掉最后一层(最后一层是 1000 类分类器,对你没用)
        # nn.Sequential(...):把剩下的层重新打包
        # 命名 backbone = 主干特征提取器
        # ✅ 去掉最后一层后,ResNet 变成什么?变成一个纯图像特征提取器,输入图片 → 输出 512 维特征向量
        # 把现成 ResNet 的层拆出来再打包
        self.backbone = nn.Sequential(*list(resnet.children())[:-1])

        # 控制头:512维特征 → 2个输出值
        # 自己新建层再打包, 神经网络模块(nn.Module)
        # nn.Sequential 是 PyTorch 内置的函数 / 类, 它创建出来的东西,叫 “模块(Module)”
        self.control_head = nn.Sequential(
            nn.Linear(512, 256), # 第一层:输入512 → 输出256, 作用:把高维特征压缩、提炼关键信息
            nn.ReLU(), # 激活函数:增加非线性, 给网络加非线性能力, 没有它,网络再深也只是线性变换,学不会复杂逻辑
            nn.Dropout(0.3),  ## 随机丢30%神经元,防止过拟合, 防止网络死记硬背训练集(过拟合)
            nn.Linear(256, 64), # 第二层:256 → 64, 让网络专注学习 “转向、油门” 相关的特征
            nn.ReLU(), # 再激活
            nn.Linear(64, 2)  # [steer, throttle], # 输出 2 个值:方向盘、油门
        )

        # 把图像特征 → 变成驾驶动作(输入:512 维特征, 输出:2 个数 → [转向角, 油门])

    def forward(self, image):
        """
        前向传播 forward(数据怎么流)
        这是模型真正运行时的逻辑。

        ---
        3. 去掉之后,后面怎么做?
        流程如下:

        图片 → 预训练 ResNet(去掉最后一层)→ 512 维特征
        512 维特征 → 你的小网络 → 2 个输出值
        用 tanh /sigmoid 限制范围 → 得到方向盘、油门

        这就叫 迁移学习(Transfer Learning)

        ---
        输入图像 (3×224×224)
                ↓
        预训练 ResNet18(去掉最后一层)→ 提取图像特征
                ↓
        512维特征向量
                ↓
        自定义全连接层(控制头)
                ↓
        输出 2 个值
                ↓
        steer: tanh → [-1, 1]
        throttle: sigmoid → [0, 1]

        ---
        六、这个模型的专业名称
        端到端自动驾驶模型 + 迁移学习 + ResNet 主干

        端到端:图像直接输出控制量
        迁移学习:用别人训练好的模型做自己的任务
        Backbone:特征提取主干
        Head:任务头(输出控制指令)

        """
        #(1)图像 → 特征, backbone 就是去掉最后一层的预训练 ResNet
        #    输出形状:批量 B × 512 通道 × 1×1(特征图)
        # (B, 3, 224, 224)= (批量大小, 通道数, 高度, 宽度)
        # 输出: 每张图变成 512 个数字的特征图。
        features = self.backbone(image)  # (B, 512, 1, 1)

        #(2)展平成向量
        # 把 512×1×1 展平成 512 维向量
        features = features.flatten(1)    # (B, 512)

        #(3)送入控制头 → 得到 2 个输出
        output = self.control_head(features)  # (B, 2)

        # 输出范围限制(真实驾驶必须)
        # steer 用 tanh 限制到 [-1, 1]
        # 方向盘 steer:必须在 -1(左)~ 1(右)→ 使用 tanh 自动压缩到 [-1,1]
        steer = torch.tanh(output[:, 0])

        # 油门 throttle:必须在 0(不踩)~ 1(踩满) → 使用 sigmoid 自动压缩到 [0,1]
        # throttle 用 sigmoid 限制到 [0, 1]
        throttle = torch.sigmoid(output[:, 1])
        return steer, throttle

    #-------------------
    """
    # 为什么能写成 self.backbone(image)?
    # 因为 PyTorch 所有网络层,内部都实现了 __call__
    # 只要一个类实现了 __call__,它就能像函数一样使用。这是 Python 规则,不是魔法!
    # 举个最简单的例子(你马上懂)

    class MyFunction:
        def __call__(self, x):
            return x * 2

    obj = MyFunction()
    result = obj(10)  # 能调用!像函数一样!
    print(result)     # 输出 20

    # nn.Sequential 内部就像上面的 MyFunction,它实现了 __call__
    # self.backbone 是对象, 但能像函数一样调用:self.backbone (参数)

    6. 最最最核心总结(你一定能懂)
    **self.backbone 是一个 PyTorch 模块
    所有 PyTorch 模块都能像函数一样调用:模块 (输入)
    调用时必须传输入,因为网络需要输入才能计算
    forward 里的 image 就是这个输入 **

    7. 用 3 行话彻底终结你的困惑
    self.backbone 是神经网络,不是普通变量
    神经网络必须接收输入才能运行
    所以你写 self.backbone(image),把图片传给它

    """
    ###

为者常成,行者常至