从校巴相撞看预警Agent的容错设计

问题背景:为什么单次预警不够?

2026年5月26日,比利时Buggenhout,一辆载着7名儿童和2名成人的校巴在平交道口被火车撞击,数人死亡。据警方透露,事发时栏杆已放下。你可以想象这个场景:系统发出了警告,却依然没能阻止悲剧。

作为技术开发者,你一定遇到过类似困境——你写的预警模块正确发出了信号,但下游执行器(栏杆、刹车、通知)失败了,或者操作者忽略了。单次“响应”机制在物理世界中是脆弱的。

这恰恰是Agent系统发挥作用的地方。Agent不是单次问答,而是一个能持续感知、多步规划、自我纠正的执行体。今天我就用一个简化版的平交道预警Agent,拆解如何通过冗余感知、层级决策、失败重试来提升可靠性。

Agent架构拆解:从感知到闭环

一个可靠的预警Agent通常包含四个核心层:

  1. 感知层:不止一个传感器。摄像头+雷达+轨旁传感器,避免单个失效。
  2. 规划层:根据当前状态选择下一步行动。不是“直接刹车”,而是“减速→观察→决策”。
  3. 工具层:调用实际设备(栏杆、扬声器、列车无线通信)。
  4. 记忆与异常处理:记住最近的失败历史,并在执行时触发重试或升级。

平交道场景的流程如下:

text
1 2 3 4 5
State: 列车接近 (速度200m/s, 距离2km)
 → 感知层: 雷达确认位置, 摄像头确认道口无车辆
 → 规划层: 计算到达时间30秒 → 执行动作: 降栏杆, 响起警报
 → 工具层: 调用栏杆电机, 返回成功?
 → 若失败: 重试3次 → 若仍失败: 升级为“紧急远程刹车”指令

核心流程图

level crossing train warning agent decision flow diagram

图中展示了一个多路径决策树:正常情况下走主路径(降栏杆+警告);如果栏杆降下失败,Agent会先重试,同时强制开启道口全时广播,并通知列车减速。

关键实现细节和踩坑记录

1. 冗余感知与融合

只用摄像头?下雨、逆光、遮挡都会导致漏检。至少需要:

  • 视觉模型(YOLOv8)检测车辆/行人
  • 雷达/激光雷达测距测速
  • 轨旁传感器(如地磁线圈)确认列车通过

实际项目中常见的坑:两个感知源各自输出置信度,但Agent不知道信任谁。我的做法是投票融合:如果视觉检测到“有车”且雷达也确认“有障碍物”,则置高优先级;如果只有一个源报告,则降低阈值但依然执行动作——宁可误报,不可漏报。

2. 层级决策 vs 线性 if-else

简单的 if-else 链容易遗漏异常组合。比如:栏杆降下但仍有车辆闯入,怎么做?我推荐用状态机 + 规则引擎

python
1 2 3 4 5
# 伪代码
if state == 'BARRIERS_DOWN' and obstacle_detected:
    action = 'ACTIVATE_BELL + SEND_EMERGENCY_STOP_TO_TRAIN'
elif state == 'BARRIERS_DOWN' and not obstacle_detected:
    action = 'CLEAR_TO_PASS'

3. 失败重试与升级

执行工具被调用后,Agent必须等待反馈。没有确认信号?默认是失败。设定重试次数和间隔(如3次,每次等待0.5秒),重试失败后必须升维——从“局部控制”升级到“全局制动”,而不是静默忽略。

简化版动手实现:30行预警Agent核心

下面是一个最简版本,用Python演示感知→规划→执行→失败重试的循环:

python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
import time
import random

class WarningAgent:
    def __init__(self):
        self.retry_limit = 3
        self.sensor_history = []

    def sense(self):
        # 模拟两个传感器:摄像头和雷达
        cam = random.random() < 0.9  # 90%正确
        radar = random.random() < 0.95
        return {'cam_obstacle': cam, 'radar_obstacle': radar}

    def decide(self, sensor_data):
        # 投票逻辑
        votes = sum([sensor_data['cam_obstacle'], sensor_data['radar_obstacle']])
        if votes >= 2:
            return 'ACTIVATE_WARNING'
        elif votes == 1:
            return 'ACTIVATE_WARNING_LOW_CONFIDENCE'  # 仍执行,但标记
        else:
            return 'WAIT'

    def execute(self, action):
        # 模拟执行工具(栏杆电机)
        if action == 'ACTIVATE_WARNING' or action == 'ACTIVATE_WARNING_LOW_CONFIDENCE':
            # 模拟90%成功率
            if random.random() < 0.9:
                return True
            else:
                return False
        return True

    def run(self):
        while True:
            data = self.sense()
            action = self.decide(data)
            if action == 'WAIT':
                continue
            success = False
            for attempt in range(self.retry_limit):
                success = self.execute(action)
                if success:
                    break
                time.sleep(0.5)
            if not success:
                # 升级:紧急通知列车减速
                print("[AGENT] 栏杆失败!发送紧急制动信号给列车")
            time.sleep(0.1)

agent = WarningAgent()
# 实际运行需中断,此处用于演示结构
agent.run()

这段代码让你看到Agent的核心差异:它不只会“回答”,还会持续感知、判断是否成功、失败后重试并升级。在真实部署中,你需要把训练好的YOLO模型替换sense函数,把真实的机械臂控制API换成execute

对开发者的行动建议

  1. 审查你的预警/告警系统:是否只有一个感知源?是否在动作失败后有重试和升级机制?如果没有,借鉴本文的决策流程。
  2. 多Agent协作初步:可以考虑将感知Agent、决策Agent、执行Agent分离,各自独立容错。
  3. 从模拟开始:用本文的30行框架,套用到你自己的场景(如服务器异常检测后自动重启、安全监控中的异常响应),先跑通闭环。

事故调查结论尚未公布,但技术社区可以在代码层面提前思考:当Agent发出信号,谁负责确认它被执行?这不仅是工程问题,也是责任归属问题。设计一个有容错、有回溯能力的Agent,可能是我们能为物理世界安全做的微小贡献。