Wei Zhang
Published © Apache-2.0

Install a simple ADAS system on old vehicles using RK3576box

Install a simple ADAS system on old vehicles. RK3576 + AI Lab. USB webcam, Bluetooth alerts. 2 hour setup. 90% detection. Safe driving now.

BeginnerFull instructions provided2 hours38

Things used in this project

Hardware components

USB webcam
×1
Seeed Studio reComputer RK3576
×1

Software apps and online services

Crazyflie Python Client
Bitcraze Crazyflie Python Client
Seeed Studio reComputer AI Lab

Story

Read more

Code

detect_video_realtime

Python
This file contains inference instructions based on AI Lab, along with encapsulated scripts for real-time inference and voice feedback.
#!/usr/bin/env python3
"""
Real-time video detection with YOLO11 via image upload API.
Supports video files, webcam, and single images.

Usage:
  python3 detect_video_realtime.py camera                    # webcam
  python3 detect_video_realtime.py camera --interval 5        # detect every 5 frames
  python3 detect_video_realtime.py test.mp4                   # video file
  python3 detect_video_realtime.py test.mp4 --save out.mp4    # save result video
  python3 detect_video_realtime.py 123.png                    # single image
  python3 detect_video_realtime.py camera --resize 640x640    # resize before upload
  python3 detect_video_realtime.py camera --alert              # enable voice alerts
  python3 detect_video_realtime.py camera --alert --alert-area 0.1  # alert with custom area ratio
"""

import os
os.environ["QT_QPA_PLATFORM"] = "xcb"
os.environ["QT_LOGGING_RULES"] = "*.debug=false;qt.qpa.*=false"

import requests
import cv2
import sys
import time
import argparse
import io
import subprocess
from pathlib import Path

API_URL = "http://127.0.0.1:8000/api/models/yolo11/predict"

ALERT_CLASSES = ['person', 'bicycle', 'car', 'motorcycle', 'bus', 'truck']
ALERT_AREA_THRESHOLD = 0.15

ALERT_MESSAGES = {
    'person': 'Pedestrian ahead',
    'bicycle': 'Bicycle ahead',
    'car': 'Vehicle too close ahead',
    'motorcycle': 'Motorcycle ahead',
    'bus': 'Bus ahead',
    'truck': 'Truck ahead',
}

COCO_COLORS = {
    'person': (0, 255, 0), 'car': (255, 200, 0), 'truck': (0, 100, 255),
    'bus': (0, 200, 255), 'bicycle': (200, 200, 0), 'motorcycle': (0, 200, 200),
    'dog': (150, 150, 255), 'cat': (200, 100, 100),
}


def draw_detections(frame, predictions, frame_size=None, sent_size=None, conf_threshold=0.0):
    fw, fh = frame_size if frame_size else (frame.shape[1], frame.shape[0])
    sw, sh = sent_size if sent_size else (fw, fh)
    sx = fw / sw if sw > 0 else 1.0
    sy = fh / sh if sh > 0 else 1.0

    for pred in predictions:
        cls_name = pred.get('class', '?')
        conf = pred.get('confidence', 0)
        if conf < conf_threshold:
            continue

        box = pred.get('box', {})

        sx = 3
        sy = 2.1635

        x1 = int(box.get('x1', 0) * sx)
        y1 = int(box.get('y1', 0) * sy)
        x2 = int(box.get('x2', 0) * sx)
        y2 = int(box.get('y2', 0) * sy)

        color = COCO_COLORS.get(cls_name, (200, 200, 200))
        label = f"{cls_name} {conf:.2f}"

        cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
        (tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 2)
        cv2.rectangle(frame, (x1, y1 - th - 6), (x1 + tw + 4, y1), color, -1)
        cv2.putText(frame, label, (x1 + 2, y1 - 4),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 2)

    return frame


def speak_alert(message):
    subprocess.run(['espeak', '-v', 'zh', message], capture_output=True)


def check_alert(predictions, frame_area, conf_threshold, area_threshold):
    for det in predictions:
        if det.get('class') not in ALERT_CLASSES:
            continue
        if det.get('confidence', 0) < conf_threshold:
            continue
        box = det.get('box', {})
        bw = box.get('x2', 0) - box.get('x1', 0)
        bh = box.get('y2', 0) - box.get('y1', 0)
        if frame_area > 0 and (bw * bh) / frame_area > area_threshold:
            return True, det['class']
    return False, None


def predict_frame(frame, resize=None):
    if resize:
        sw, sh = resize
        img = cv2.resize(frame, (sw, sh))
    else:
        sh, sw = frame.shape[:2]
        img = frame

    success, enc = cv2.imencode('.jpg', img, [cv2.IMWRITE_JPEG_QUALITY, 85])
    if not success:
        return None, 0, 0

    img_bytes = io.BytesIO(enc.tobytes())
    try:
        resp = requests.post(API_URL, files={'file': ('frame.jpg', img_bytes, 'image/jpeg')}, timeout=15)
        if resp.status_code == 200:
            return resp.json(), sw, sh
    except Exception as e:
        print(f"\n[Error] API request failed: {e}")
    return None, 0, 0


def process_video(input_source, interval=1, save_path=None, resize=None, no_display=False, conf_threshold=0.4, alert_enabled=False, alert_area=0.15):
    if input_source == "camera":
        cap = cv2.VideoCapture(0)
        source_name = "Camera"
    else:
        cap = cv2.VideoCapture(input_source)
        source_name = Path(input_source).name

    if not cap.isOpened():
        print(f"[Error] Cannot open: {input_source}")
        sys.exit(1)

    fps = cap.get(cv2.CAP_PROP_FPS)
    if fps <= 0:
        fps = 30

    frame_w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    print(f"[Detect] Source: {source_name}")
    print(f"[Detect] Size: {frame_w}x{frame_h}, FPS: {fps:.2f}")
    if resize:
        print(f"[Detect] Upload resize: {resize[0]}x{resize[1]}")
    print(f"[Detect] Interval: every {interval} frame(s)")
    print(f"[Detect] Press 'q' or ESC to quit\n")

    video_writer = None
    if save_path:
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        video_writer = cv2.VideoWriter(save_path, fourcc, fps, (frame_w, frame_h))

    frame_idx = 0
    last_predictions = []
    last_sent_size = (frame_w, frame_h)
    api_call_count = 0
    api_time_cost = 0
    run = True

    while run:
        ret, frame = cap.read()
        if not ret:
            if total_frames > 0:
                print(f"\n[Detect] End of video")
            break

        current_time = frame_idx / fps

        if frame_idx % interval == 0:
            t0 = time.time()
            result, sw, sh = predict_frame(frame, resize)
            t1 = time.time()
            dt = t1 - t0
            api_time_cost += dt

            if result and result.get('success'):
                last_predictions = result.get('predictions', [])
                if sw > 0 and sh > 0:
                    last_sent_size = (sw, sh)
                api_call_count += 1
                info = f"\r[Detect] Frame {frame_idx}"
                if total_frames > 0:
                    info += f"/{total_frames}"
                info += f" | {current_time:.1f}s | Det: {len(last_predictions)} | API: {dt*1000:.0f}ms"
                print(info, end='', flush=True)

        alerted = False
        obj_type = None
        if alert_enabled and len(last_predictions) > 0:
            alerted, obj_type = check_alert(last_predictions, last_sent_size[0] * last_sent_size[1], conf_threshold, alert_area)
            if alerted:
                message = ALERT_MESSAGES.get(obj_type, 'Obstacle ahead')
                speak_alert(message)

        display = frame.copy()
        draw_detections(display, last_predictions, frame_size=(frame_w, frame_h), sent_size=last_sent_size, conf_threshold=conf_threshold)

        overlay_lines = [
            f"Frame: {frame_idx}" + (f"/{total_frames}" if total_frames > 0 else ""),
            f"Time: {current_time:.1f}s",
            f"Detections: {len(last_predictions)}",
            f"API calls: {api_call_count}",
        ]
        if alerted:
            alert_text = ALERT_MESSAGES.get(obj_type, 'Obstacle ahead')
            overlay_lines.append(f"ALERT: {alert_text}")
            cv2.putText(display, f"WARNING {alert_text}", (frame_w // 2 - 240, 80),
                        cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0, 0, 255), 3)
        for i, line in enumerate(overlay_lines):
            cv2.putText(display, line, (12, frame_h - 60 + i * 18),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (200, 200, 200), 1)

        if video_writer:
            video_writer.write(display)

        if not no_display:
            cv2.imshow(f"YOLO11 - {source_name}", display)
            key = cv2.waitKey(1) & 0xFF
            if key in (ord('q'), 27):
                run = False

        frame_idx += 1

    cap.release()
    if video_writer:
        video_writer.release()
    cv2.destroyAllWindows()

    avg_ms = (api_time_cost / api_call_count * 1000) if api_call_count > 0 else 0
    print(f"\n[Detect] Done. Frames: {frame_idx}, API calls: {api_call_count}, Avg API: {avg_ms:.0f}ms")
    if save_path:
        print(f"[Detect] Saved: {save_path}")


def process_image(img_path, resize=None, no_display=False, conf_threshold=0.4):
    frame = cv2.imread(img_path)
    if frame is None:
        print(f"[Error] Cannot read image: {img_path}")
        sys.exit(1)

    h, w = frame.shape[:2]
    print(f"[Detect] Image: {Path(img_path).name} ({w}x{h})")

    result, sw, sh = predict_frame(frame, resize)
    if not result or not result.get('success'):
        print("[Error] API prediction failed")
        sys.exit(1)

    sent_size = (sw, sh) if sw > 0 and sh > 0 else (w, h)
    predictions = result.get('predictions', [])
    print(f"[Detect] Got {len(predictions)} detections")

    display = frame.copy()
    draw_detections(display, predictions, frame_size=(w, h), sent_size=sent_size, conf_threshold=conf_threshold)

    out_name = f"detect_{Path(img_path).stem}.jpg"
    cv2.imwrite(out_name, display)
    print(f"[Detect] Saved: {out_name}")

    if not no_display:
        cv2.imshow(f"YOLO11 - {Path(img_path).name}", display)
        print("[Detect] Press any key to exit...")
        cv2.waitKey(0)
        cv2.destroyAllWindows()


def main():
    os.environ.setdefault("QT_QPA_FONTDIR", "/usr/share/fonts/truetype")

    parser = argparse.ArgumentParser(description="Real-time video detection with YOLO11")
    parser.add_argument("input", nargs='?', default="camera",
                        help="Input: video file, image file, or 'camera' (default)")
    parser.add_argument("--interval", type=int, default=1,
                        help="Detect every N frames (default: 1)")
    parser.add_argument("--conf", type=float, default=0.4,
                        help="Confidence threshold (default: 0.4)")
    parser.add_argument("--save", help="Save output video to path")
    parser.add_argument("--resize", help="Resize before upload, e.g. 640x640")
    parser.add_argument("--no-display", action="store_true",
                        help="No GUI display (headless)")
    parser.add_argument("--alert", action="store_true",
                        help="Enable voice alerts for close objects")
    parser.add_argument("--alert-area", type=float, default=0.15,
                        help="Alert when box area exceeds this ratio of frame (default: 0.15)")
    args = parser.parse_args()

    resize = None
    if args.resize:
        try:
            rw, rh = args.resize.split('x')
            resize = (int(rw), int(rh))
        except ValueError:
            print("[Error] --resize format must be WxH, e.g. 640x640")
            sys.exit(1)

    ext = Path(args.input).suffix.lower()
    img_exts = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.tif', '.webp'}

    if args.input == "camera" or os.path.isdir(args.input):
        process_video(args.input, args.interval, args.save, resize, args.no_display, args.conf,
                      args.alert, args.alert_area)
    elif ext in img_exts and os.path.isfile(args.input):
        process_image(args.input, resize, args.no_display, args.conf)
    elif os.path.isfile(args.input):
        process_video(args.input, args.interval, args.save, resize, args.no_display, args.conf,
                      args.alert, args.alert_area)
    else:
        print(f"[Error] Input not found: {args.input}")
        sys.exit(1)


if __name__ == '__main__':
    main()

Credits

Wei Zhang
3 projects • 0 followers
AE

Comments