先看效果

railroad crossing violation detection demo

上周比利时一辆校车在道口栏杆已经放下后仍试图穿越,被火车撞击导致4死5伤。这种悲剧并非孤例,全球每年因道口违规造成的死亡超过5000起。

作为开发者,我们可以用AI视觉技术实时检测道口区域,当行人或车辆在栏杆放下后闯入轨道时立刻报警,为司机和安全员提供额外预警。

读完本文你将学会:

  • 用YOLOv8轻量模型检测道口区域的人和车
  • 结合状态判断(栏杆状态/轨道区域)触发报警
  • 搭建实时视频流处理+报警推送的后端
  • 了解边缘设备部署的注意事项

项目完整代码可跑通,只需一个普通USB摄像头或视频源。

技术选型

组件 方案 理由
目标检测 YOLOv8n 参数量3.2M,在树莓派4上也跑得动(15FPS以上)
视频处理 OpenCV + imutils 轻量、跨平台、帧处理灵活
后端框架 FastAPI + WebSocket 低延迟推送报警事件
前端展示 原生HTML + JS Canvas 零依赖,直接嵌入任何平台
部署环境 Docker 或 NVIDIA Jetson 容器化后一次部署到处跑

为什么不选更复杂的方案?

  • 不需要多目标跟踪(只需判断有无违规,不需要ID)
  • 不需要语义分割(检测框加区域判断足够)
  • YOLOv8n在CPU上也能运行(2核2.5GHz下约20FPS)

核心代码实现

1. 模型加载与区域定义

python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
# detection/model.py
from ultralytics import YOLO
import cv2
import numpy as np

class CrossingDetector:
    def __init__(self, model_path="yolov8n.pt"):
        self.model = YOLO(model_path)
        # 定义轨道区域(四点多边形)
        self.track_zone = np.array([
            [100, 400], 
            [600, 400], 
            [700, 600],
            [0, 600]
        ], dtype=np.int32)
        # 违规目标类型(人、二轮车、汽车、巴士)
        self.violation_classes = [0, 1, 2, 3, 5]  # COCO索引
        
    def is_in_zone(self, bbox):
        """判断目标框下边缘中心点是否在轨道区域内"""
        x1, y1, x2, y2 = bbox
        center_bottom = ((x1 + x2) // 2, y2)
        return cv2.pointPolygonTest(self.track_zone, center_bottom, False) >= 0

2. 实时检测与违规判断

python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
# detection/stream.py
import cv2
from ultralytics import YOLO
detector = CrossingDetector()

def process_frame(frame, barrier_closed=False):
    """
    :param frame: 当前视频帧 (numpy array)
    :param barrier_closed: 栏杆是否关闭(可由外部传感器或视觉识别提供)
    :return: (annotated_frame, violations) 
    """
    results = detector.model(frame, conf=0.5)[0]
    violations = []
    
    for box in results.boxes:
        cls = int(box.cls[0])
        if cls not in detector.violation_classes:
            continue
        bbox = box.xyxy[0].cpu().numpy().astype(int)
        if detector.is_in_zone(bbox):
            # 如果栏杆关闭并且目标在轨道内,视为违规
            if barrier_closed:
                violations.append({
                    "type": results.names[cls],
                    "bbox": bbox.tolist(),
                    "confidence": float(box.conf[0])
                })
            # 绘制矩形和标签
            cv2.rectangle(frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), (0,0,255), 2)
            cv2.putText(frame, "VIOLATION!", (bbox[0], bbox[1]-10), 
                        cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0,0,255), 2)
    
    # 绘制轨道区域
    cv2.polylines(frame, [detector.track_zone], True, (255,0,0), 2)
    return frame, violations

3. 报警推送(WebSocket)

python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
# app.py 核心片段
from fastapi import FastAPI, WebSocket
from fastapi.responses import StreamingResponse
import asyncio

app = FastAPI()

@app.websocket("/ws/alerts")
async def alert_stream(websocket: WebSocket):
    await websocket.accept()
    cap = cv2.VideoCapture(0)  # 摄像头0
    barrier_closed = False  # 模拟栏杆状态
    
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        # 模拟栏杆状态(时间每5秒切换一次,或根据外部信号)
        if int(time.time()) % 10 < 5:
            barrier_closed = True
        else:
            barrier_closed = False
        
        annotated_frame, violations = process_frame(frame, barrier_closed)
        if violations:
            # 发送报警数据
            await websocket.send_json({
                "timestamp": time.time(),
                "violations": violations,
                "barrier_state": "closed" if barrier_closed else "open"
            })
        
        # 可选:推送视频帧到前端(为实时展示,可压缩)
        _, buffer = cv2.imencode('.jpg', annotated_frame, [cv2.IMWRITE_JPEG_QUALITY, 50])
        await websocket.send_bytes(buffer.tobytes())
        
        await asyncio.sleep(0.03)  # ~30FPS

项目结构

text
1 2 3 4 5 6 7 8 9 10 11
railway_monitor/
├── detection/
│   ├── __init__.py
│   ├── model.py         # YOLO加载 + 区域定义
│   └── stream.py        # 帧处理逻辑
├── static/
│   ├── index.html       # 前端页面(WebSocket接收视频+报警)
│   └── app.js           # 前端逻辑
├── app.py               # FastAPI入口
├── requirements.txt
└── Dockerfile

requirements.txt:

text
1 2 3 4 5 6 7
fastapi
uvicorn
opencv-python-headless
ultralytics
numpy
imutils
websockets

启动命令:

bash
1 2
pip install -r requirements.txt
uvicorn app:app --host 0.0.0.0 --port 8000

前端访问 http://localhost:8000 即可看到实时摄像头画面和报警信息。

上线要注意的坑

1. 栏杆状态获取是最大难点

代码中用时间模拟了栏杆状态,实战中需要真实获取。

  • 方案A:视觉识别栏杆角度(额外训练一个分类器检测栏杆水平/垂直)
  • 方案B:对接道口信号系统(通过MQTT或Modbus读取继电器状态)
  • 方案C:简单方案——只检测「有人在轨道内」即报警(不考虑栏杆状态),但误报率高(维修人员等)。

我的建议:先采用方案C快速上线,因为99%的违规穿越都发生在栏杆关闭后。后续再引入栏杆检测提升准确率。

2. 光照与天气

YOLOv8在白天很好用,到了夜晚或雨雾天效果骤降。需要:

  • 添加红外摄像头(或者普通摄像头+补光灯)
  • 在模型推理前做自适应直方图均衡化(CLAHE)
  • 降低置信度阈值(例如0.3),但会增加误报
python
1 2 3 4 5 6
def preprocess(frame):
    lab = cv2.cvtColor(frame, cv2.COLOR_BGR2LAB)
    l, a, b = cv2.split(lab)
    clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
    l = clahe.apply(l)
    return cv2.merge([l, a, b])

3. 延迟与性能

  • 使用FastAPI+WebSocket推送压缩后的JPEG(Quality 50)可将每帧大小从2MB降到50KB
  • 在边缘设备上(如Jetson Nano),关闭显示窗口,只做推理和推送
  • 建议batch处理:如果每秒处理15帧,报警频率足够,不需要满30FPS

4. 误报抑制

连续3帧都检测到违规才触发报警(而不是单帧),避免飞鸟、树叶干扰。

python
1 2 3 4 5 6 7 8
# 简单的下采样过滤
violation_history = []

def filter_violations(violations):
    violation_history.append(bool(violations))
    if len(violation_history) > 5:
        violation_history.pop(0)
    return sum(violation_history) >= 3  # 5帧中至少3帧有违规

5. 隐私合规

部署在公共场所需要告知并模糊人脸/车牌。可以在报警截图时自动打码:

python
1
frame = cv2.GaussianBlur(roi, (51,51), 0)  # 对检测框内模糊

写在最后

比利时的事故再次提醒我们,科技不该只存在于炫酷Demo里,更应该下沉到安全场景。这套系统成本仅几百元(树莓派+USB摄像头),就能覆盖一个道口,24小时值守。

下一步可以做什么?

  • 接入声光报警器(GPIO控制)
  • 通过4G模块发送短信给道口值班人员
  • 长期积累违规数据,训练专属模型(比如只检测校车这类高风险目标)

去GitHub搜 railway-crossing-ai 我可放了一个更完整的版本(含硬件接入)。如果你跑通了,记得留言告诉我改进了什么。

railroad crossing detection deployment