Lester LiuAnh PhamHan FengElliot Metcalf
Published © LGPL

Skyrover

Raspberry pi4 powered autonomous Car that drives and detects blue lane lines and automatically stops at red stop boxes.

IntermediateProtip10 hours142
Skyrover

Things used in this project

Hardware components

Raspberry Pi 4 Model B
Raspberry Pi 4 Model B
×1
Webcam, Logitech® HD Pro
Webcam, Logitech® HD Pro
×1
Portable Battery
×1
Optical Speed Encoder
×1
Portable Charger
×1
DC motor (generic)
×1

Software apps and online services

OpenCV
OpenCV

Story

Read more

Code

RC_Car_control_final.py

Python
import cv2
import numpy as np
import matplotlib.pyplot as plt
import math
import sys
import time
import os
import RPi.GPIO as GPIO

# Create output directories for images and plots
OUTPUT_DIR = "output_images"
PLOTS_DIR = "plots"
if not os.path.exists(OUTPUT_DIR):
    os.makedirs(OUTPUT_DIR)
if not os.path.exists(PLOTS_DIR):
    os.makedirs(PLOTS_DIR)

# GPIO setup
GPIO.setmode(GPIO.BCM)  

# Motor and servo setup
GPIO.setup(18, GPIO.OUT)  # Initializes GPIO 18 for motor
GPIO.setup(19, GPIO.OUT)  # Initializes GPIO 19 for steering

pwm_esc = GPIO.PWM(18, 50)  # Creates a PWM at GPIO 18 running at 50Hz
pwm_serv = GPIO.PWM(19, 50)  # Creates a PWM at GPIO 19 running at 50Hz

pwm_esc.start(7.5)  # Puts DC motor on neutral
pwm_serv.start(7.5)  # Sets servo motor straight

# Speed encoder setup
ENCODER_PIN = 6  # GPIO pin for the encoder's OUT pin
GPIO.setup(ENCODER_PIN, GPIO.IN)  

# Constants for speed calculation 
PULSES_PER_REVOLUTION = 20 
WHEEL_CIRCUMFERENCE = 0.2  
SPEED_CHANGE = 0.001  
ENCODER_MAX_SPEED = 5.0  
ENCODER_MIN_SPEED = 1.0  

# Variables for speed encoder
last_activation_time = 0  # To store the time of the last encoder activation
current_speed_kmh = 0  # Current speed in km/h
speed_vals_kmh = []  # To store actual speed values for plotting

# Callback function to handle encoder activations and calculate speed
def encoder_callback(channel):
    global last_activation_time, current_speed_kmh
    current_time = time.time()
    # Skip the first activation since we need two timestamps
    if last_activation_time != 0:  
        time_between_activations = current_time - last_activation_time
        if time_between_activations > 0:
            frequency = 1 / time_between_activations  
            rpm = (frequency / PULSES_PER_REVOLUTION) * 60  
            speed_mps = (rpm / 60) * WHEEL_CIRCUMFERENCE  
            current_speed_kmh = speed_mps * 3.6  
    
    last_activation_time = current_time

# Add event detection for the encoder 
GPIO.add_event_detect(ENCODER_PIN, GPIO.FALLING, callback=encoder_callback, bouncetime=2)

# Camera
frame_width = 160
frame_height = 120
cam_idx = 0

# PD variables
kp = 0.08  # Proportional gain 
kd = kp * 0.3  # Derivative gain 

# Speed Values
zero_speed = 7.5  # Car is stopped
base_speed = 7.9  # Speed for middle ground
zero_turn = 7.5  # Neutral steering

# Turn threshold for stop-and-turn
turn_threshold = 1.5  # Increased to reduce stops (was 1.2)
turn_wait_time = 0.3  # Wait time for turns

# Max number of loop
max_ticks = 4000

def reset_car():
    # Set speed to zero and steering to neutral
    pwm_esc.ChangeDutyCycle(7.5)  
    pwm_serv.ChangeDutyCycle(7.5)  

def start_car():
    # Set speed to base speed and steering to neutral
    print("Car ready")
    pwm_esc.ChangeDutyCycle(base_speed)  
    pwm_serv.ChangeDutyCycle(7.5)  

def wait(wait_time):
    # Wait for a period of time
    start_time = time.perf_counter()  # Start
    end_time = start_time + wait_time  # End
    # Loop until finished
    while (time.perf_counter() < end_time):
        pass
    return
    
def detect_edges(frame):
    # Detect the blue lines
    hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
    lower_blue = np.array([90, 50, 50], dtype="uint8")
    upper_blue = np.array([130, 255, 255], dtype="uint8")
    # Make mask
    mask = cv2.inRange(hsv, lower_blue, upper_blue)
    # Get edges
    edges = cv2.Canny(mask, 50, 100)
    return edges

def region_of_interest(edges):
    # Find region to look at
    height, width = edges.shape
    mask = np.zeros_like(edges)

    # Only focus lower half of the screen
    polygon = np.array([[
        (0, height),
        (0, height / 2),
        (width, height / 2),
        (width, height),
    ]], np.int32)

    cv2.fillPoly(mask, polygon, 255)
    
    # Get region to look at 
    cropped_edges = cv2.bitwise_and(edges, mask)
    return cropped_edges

def detect_line_segments(cropped_edges):
    # Find line segments
    rho = 1
    theta = np.pi / 180
    min_threshold = 10
    
    # Get lines
    line_segments = cv2.HoughLinesP(cropped_edges, rho, theta, min_threshold,
                                    np.array([]), minLineLength=5, maxLineGap=150)
    return line_segments

def average_slope_intercept(frame, line_segments):
    # Finding slope of the line
    lane_lines = []
    if line_segments is None:
        print("No line segments detected")
        return lane_lines

    # Set boundaries
    height, width, _ = frame.shape
    left_fit = []
    right_fit = []
    boundary = 1 / 3
    left_region_boundary = width * (1 - boundary)
    right_region_boundary = width * boundary

    # Go through line segments and get line of best fit
    for line_segment in line_segments:
        for x1, y1, x2, y2 in line_segment:
            if x1 == x2:
                continue
            fit = np.polyfit((x1, x2), (y1, y2), 1)
            slope = (y2 - y1) / (x2 - x1)
            intercept = y1 - (slope * x1)
            if slope < 0:
                if x1 < left_region_boundary and x2 < left_region_boundary:
                    left_fit.append((slope, intercept))
            else:
                if x1 > right_region_boundary and x2 > right_region_boundary:
                    right_fit.append((slope, intercept))
    
    # Take average line of best fit
    left_fit_average = np.average(left_fit, axis=0)
    if len(left_fit) > 0:
        lane_lines.append(make_points(frame, left_fit_average))
    right_fit_average = np.average(right_fit, axis=0)
    if len(right_fit) > 0:
        lane_lines.append(make_points(frame, right_fit_average))
    return lane_lines

def make_points(frame, line):
    # For getting visual of lines
    height, width, _ = frame.shape
    slope, intercept = line
    y1 = height  # Bottom of the frame
    y2 = int(y1 / 2)  # Make points from middle of the frame down
    if slope == 0:
        slope = 0.1
    x1 = int((y1 - intercept) / slope)
    x2 = int((y2 - intercept) / slope)
    return [[x1, y1, x2, y2]]

def display_lines(frame, lines, line_color=(0, 255, 0), line_width=6):
    # Get visual of lines
    line_image = np.zeros_like(frame)
    if lines is not None:
        for line in lines:
            for x1, y1, x2, y2 in line:
                cv2.line(line_image, (x1, y1), (x2, y2), line_color, line_width)
    line_image = cv2.addWeighted(frame, 0.8, line_image, 1, 1)
    return line_image

def display_heading_line(frame, steering_angle, line_color=(0, 0, 255), line_width=5):
    # Show a heading line
    heading_image = np.zeros_like(frame)
    height, width, _ = frame.shape
    steering_angle_radian = steering_angle / 180.0 * math.pi
    x1 = int(width / 2)
    y1 = height
    x2 = int(x1 - height / 2 / math.tan(steering_angle_radian))
    y2 = int(height / 2)
    cv2.line(heading_image, (x1, y1), (x2, y2), line_color, line_width)
    heading_image = cv2.addWeighted(frame, 0.8, heading_image, 1, 1)
    return heading_image

def get_steering_angle(frame, lane_lines):
    # Get the angle to steer towards
    height, width, _ = frame.shape
    if len(lane_lines) == 2:
        _, _, left_x2, _ = lane_lines[0][0]
        _, _, right_x2, _ = lane_lines[1][0]
        mid = int(width / 2)
        x_offset = (left_x2 + right_x2) / 2 - mid
        y_offset = int(height / 2)
    elif len(lane_lines) == 1:
        x1, _, x2, _ = lane_lines[0][0]
        x_offset = x2 - x1
        y_offset = int(height / 2)
    elif len(lane_lines) == 0:
        x_offset = 0
        y_offset = int(height / 2)
    angle_to_mid_radian = math.atan(x_offset / y_offset)
    angle_to_mid_deg = int(angle_to_mid_radian * 180.0 / math.pi)
    steering_angle = angle_to_mid_deg + 90
    return steering_angle

def manage_speed(curr_speed):
    # Adjust the speed based on the speed encoder
    global current_speed_kmh
    speed_adjustment = 0  # Default: no speed change
    if current_speed_kmh > ENCODER_MAX_SPEED:  # Too fast
        speed_adjustment = -SPEED_CHANGE  # Reduce speed
    elif current_speed_kmh < ENCODER_MIN_SPEED:  # Too slow
        speed_adjustment = SPEED_CHANGE  # Increase speed
    return speed_adjustment

def plot_pd(p_vals, d_vals, error, show_img=False):
    # Plot the proportional, derivative, and error
    fig, ax1 = plt.subplots()
    t_ax = np.arange(len(p_vals))
    ax1.plot(t_ax, p_vals, '-', label="P values")
    ax1.plot(t_ax, d_vals, '-', label="D values")
    ax2 = ax1.twinx()
    ax2.plot(t_ax, error, '--r', label="Error")
    ax1.set_xlabel("Frames")
    ax1.set_ylabel("PD Value")
    ax2.set_ylim(-90, 90)
    ax2.set_ylabel("Error Value")
    plt.title("PD Values over time")
    fig.legend()
    fig.tight_layout()
    plt.savefig(os.path.join(PLOTS_DIR, "pd_plot.png"))
    if show_img:
        plt.show()
    plt.clf()

def plot_pwm(speed_vals, turn_vals, speed_vals_kmh, error, show_img=False):
    # Plot the speed, steering, and error 
    fig, ax1 = plt.subplots()
    t_ax = np.arange(len(speed_vals))
    ax1.plot(t_ax, speed_vals, '-', label="Speed Duty Cycle")
    ax1.plot(t_ax, turn_vals, '-', label="Steering Duty Cycle")
    ax2 = ax1.twinx()
    ax2.plot(t_ax, error / np.max(error), '--r', label="Error")
    ax1.set_xlabel("Frames")
    ax1.set_ylabel("Duty Cycle")
    ax2.set_ylabel("Percent Error Value")
    plt.title("Speed, Steering, and Error vs. Time")
    fig.legend()
    plt.savefig(os.path.join(PLOTS_DIR, "voltage_plot.png"))
    if show_img:
        plt.show()
    plt.clf()

def isRedFloorVisible(image):
    # Detects whether or not the majority of a color on the screen is a particular color
    hsv_img = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
    cv2.imwrite(os.path.join(OUTPUT_DIR, "redfloor.jpg"), hsv_img)
    percentage = 25
    lower_red1 = np.array([0, 40, 60], dtype="uint8")
    upper_red1 = np.array([10, 255, 255], dtype="uint8")
    lower_red2 = np.array([170, 40, 60], dtype="uint8")
    upper_red2 = np.array([180, 255, 255], dtype="uint8")
    mask1 = cv2.inRange(hsv_img, lower_red1, upper_red1)
    mask2 = cv2.inRange(hsv_img, lower_red2, upper_red2)
    mask = cv2.bitwise_or(mask1, mask2)
    output = cv2.bitwise_and(hsv_img, hsv_img, mask=mask)
    cv2.imwrite(os.path.join(OUTPUT_DIR, "redfloormask.jpg"), output)
    percentage_detected = np.count_nonzero(mask) * 100 / np.size(mask)
    result = percentage < percentage_detected
    if result:
        print(percentage_detected)
    return result, output

def main():
    lastTime = 0
    lastError = 0
    SecondStopTick = 0

    # Arrays for making the  graphs
    p_vals = []  # Proportional
    d_vals = []  # Derivative
    err_vals = []  # Error
    speed_vals = []  # Speed values 
    steer_vals = []  # Steering values

    # Set up video
    video = cv2.VideoCapture(cam_idx)
    video.set(cv2.CAP_PROP_FRAME_WIDTH, frame_width)
    video.set(cv2.CAP_PROP_FRAME_HEIGHT, frame_height)

    try:
        # Start car
        start_car()
        curr_speed = base_speed
        counter = 0

        passedFirstStopSign = False
        while counter < max_ticks:
            print(f"Counter: {counter}, Speed: {curr_speed}, Actual Speed: {current_speed_kmh:.2f} km/h")
            # Manage video
            ret, original_frame = video.read()
            if not ret:
                print("Failed to capture frame")
                break

            frame = cv2.resize(original_frame, (160, 120))

            # Process the frame to determine the desired steering angle
            edges = detect_edges(frame)
            # Save edges every 10 frames 
            if counter % 10 == 0:
                cv2.imwrite(os.path.join(OUTPUT_DIR, f"edges_{counter}.jpg"), edges)
                cv2.imwrite(os.path.join(OUTPUT_DIR, f"frame_{counter}.jpg"), frame)  # Save original frame
            roi = region_of_interest(edges)
            line_segments = detect_line_segments(roi)
            lane_lines = average_slope_intercept(frame, line_segments)
            lane_lines_image = display_lines(frame, lane_lines)
            if counter % 10 == 0:
                cv2.imwrite(os.path.join(OUTPUT_DIR, f"lanes_{counter}.jpg"), lane_lines_image)  # Save lane lines
            steering_angle = get_steering_angle(frame, lane_lines)
            heading_image = display_heading_line(lane_lines_image, steering_angle)

            # Calculate changes for PD
            now = time.time()
            dt = now - lastTime if now - lastTime > 0 else 0.001  # Prevent division by zero
            deviation = steering_angle - 90

            # PD Code
            error = deviation
            base_turn = 7.5
            proportional = kp * error
            derivative = kd * (error - lastError) / dt

            # Take values for graphs
            p_vals.append(proportional)
            d_vals.append(derivative)
            err_vals.append(error)
            speed_vals.append(curr_speed)
            speed_vals_kmh.append(current_speed_kmh)

            # Determine actual turn to do
            turn_amt = base_turn + proportional + derivative
            # Clamp turn_amt to the desired range (6 to 9)
            turn_amt = max(6.0, min(9.0, turn_amt))
            steer_vals.append(turn_amt)

            # Print output every 10 frames
            if counter % 10 == 0:
                print(f"Steering_angle: {steering_angle}, Error: {error}, Turn_amt: {turn_amt}")

            # Stop-and-turn logic
            if abs(turn_amt - zero_turn) > turn_threshold:
                # Significant turn: stop, turn, then resume
                print(f"Stopping to turn: turn_amt={turn_amt}, steering_angle={steering_angle}")
                #pwm_esc.ChangeDutyCycle(zero_speed)  
                pwm_esc.ChangeDutyCycle(7.6)
                pwm_serv.ChangeDutyCycle(turn_amt)  
                wait(turn_wait_time)  
                pwm_esc.ChangeDutyCycle(curr_speed)  
            else:
                # Minor turn: steer while moving
                pwm_esc.ChangeDutyCycle(curr_speed)  
                pwm_serv.ChangeDutyCycle(turn_amt)  

            # Speed encoding: adjust speed based on encoder readings
            if counter % 3 == 0:  # Check every 3 ticks 
                speed_adjustment = manage_speed(curr_speed)
                temp_speed = curr_speed + speed_adjustment
                temp_speed = max(7.5, min(8.5, temp_speed))
                if temp_speed != curr_speed:
                    curr_speed = temp_speed
                    pwm_esc.ChangeDutyCycle(curr_speed)

            # Stop if sees the red box
            if counter % 20 == 0:  # Check every 20 ticks to reduce processing
                if not passedFirstStopSign:
                    isStopSign, floorSight = isRedFloorVisible(frame)
                    if isStopSign:
                        print("First red box detected! Stopping for 5 seconds...")
                        pwm_esc.ChangeDutyCycle(zero_speed)  # Stop the car
                        wait(5)  # Wait for 5 seconds
                        print("Resuming after 5 seconds...")
                        passedFirstStopSign = True
                        SecondStopTick = counter
                        pwm_esc.ChangeDutyCycle(curr_speed)  # Resume moving
                # Stopping permanently if second red box detected
                elif passedFirstStopSign and counter > SecondStopTick + 100:
                    isStopSign, _ = isRedFloorVisible(frame)
                    if isStopSign:
                        print("Second red box detected! Stopping permanently...")
                        pwm_esc.ChangeDutyCycle(zero_speed)  # Stop the car
                        break  # Exit the loop to end the script

            wait(0.023)

            lastTime = now
            lastError = error
            counter += 1

    # Reset car and close everything
    reset_car()
    video.release()

    # Save plots
    plot_pd(p_vals, d_vals, err_vals)
    plot_pwm(speed_vals, steer_vals, speed_vals_kmh, err_vals)

    print("dimensions")
    print(speed_vals)
    print(steer_vals)
    print(err_vals)

if __name__ == "__main__":
    main()

speed_controller.c

C/C++
#include <linux/module.h>
#include <linux/of_device.h>
#include <linux/kernel.h>
#include <linux/gpio/consumer.h>
#include <linux/platform_device.h>
#include <linux/init.h>
#include <linux/interrupt.h>
#include <linux/hrtimer.h>
#include <linux/ktime.h>
#include <linux/types.h>
static struct gpio_desc *speedometer_gpio; //gpio pin for speed encoder
unsigned int irq_number;
static int value;
// Timer variables
static struct hrtimer my_timer;
static ktime_t interval; //record the interval of light shining through
ktime_t now;
//static int shining;
static ktime_t start_time; //time light starts shining
static ktime_t stop_time; //time light stops shining
static u64 pulse_duration_ns = 0;

enum hrtimer_restart my_timer_callback(struct hrtimer *timer)
{
	pr_info("Timer fired!\n");
	// Re-arm the timer for periodic behavior
	hrtimer_forward_now(timer, interval);
	return HRTIMER_RESTART;
}
static irqreturn_t gpio_irq_handler(int irq, void *dev_id)
{
	pr_info("Interrupt triggered. ISR called.\n");
	value = gpio_get_value(speedometer_gpio);
        ktime_t now = ktime_get();
	pr_info("Voltage reading: %d \n", value);
        if (value == 1) {
                start_time = now;
        } else {
                pulse_duration_ns = ktime_to_ns(ktime_sub(now, start_time));
                pr_info("Pulse duration: %llu ns\n", pulse_duration_ns);
    	}
	return IRQ_HANDLED;
}
// probe function
static int speedometer_probe(struct platform_device *pdev)
{
	int return_value;
	pr_info("Loading speed module...\n");
	//GPIO pins init
	speedometer_gpio = gpiod_get(&pdev->dev, "speed", GPIOD_IN);
	pr_info("GPIO pin init complete\n");
	//interrupt init
	irq_number = gpiod_to_irq(speedometer_gpio);
	return_value = request_irq(irq_number, gpio_irq_handler,IRQF_TRIGGER_RISING | IRQF_TRIGGER_FALLING,, "speedometer_irq", pdev);
	if (return_value){
		pr_info("Failed IRQ.\n");
	return return_value;
	}
	pr_info("Interrupt init complete\n");
	// Timer init
	interval = ktime_set(0, 500 * 1000000); // 500ms
	hrtimer_init(&my_timer, CLOCK_MONOTONIC, HRTIMER_MODE_REL);
	my_timer.function = my_timer_callback;
	hrtimer_start(&my_timer, interval, HRTIMER_MODE_REL);
	now = ktime_get();
	pr_info("Timer init complete\n");
	return 0;
}
// remove function
static int speedometer_remove(struct platform_device *pdev)
{
	int ret = hrtimer_cancel(&my_timer);
	pr_info("Removing speed module...\n");
	//free irq
	free_irq(irq_number, pdev);
	pr_info("IRQ freed successfully!\n");
	//free GPIO
	gpiod_put(speedometer_gpio);
	// Cancel the timer; pr_info is just printk with KERN_INFO
	if (ret)
		pr_info("Timer was active and has been cancelled\n");
	else
	pr_info("Timer was not active\n");
	return 0;
}
static struct of_device_id matchy_match[] = {
{.compatible = "elec553,mydevice"},
{},
};
// platform driver object
static struct platform_driver speed_encoder = {
	.probe = speedometer_probe,
	.remove = speedometer_remove,
	.driver = {
	.name = "The Rock: this name doesn't even matter",
	.owner = THIS_MODULE,
	.of_match_table = matchy_match,
	},
};
module_platform_driver(speed_encoder);
MODULE_DESCRIPTION("424\'s finest");
MODULE_AUTHOR("GOAT");
MODULE_LICENSE("GPL v2");
MODULE_ALIAS("platform:adam_driver");

speed_controller.dts

C/C++
/dts-v1/;

/plugin/;



/ {

  fragment@0 {

    target-path = "/";

    _overlay_ {

      mydevice {

        compatible = "elec553,mydevice";

        speed-gpios = <&gpio 6 0>;

      };

    };

  };

};

Makefile

C/C++
# Modified from:
# Dr. Derek Molloy, School of Electronic Engineering, Dublin City University,
# Ireland. URL: http://derekmolloy.ie/writing-a-linux-kernel-module-part-1-introduction/
PWD=$(shell pwd)
KERNEL_BUILD=/lib/modules/$(shell uname -r)/build

obj-m+=gpiod_driver.o

all:
	make -C /lib/modules/$(shell uname -r)/build/ M=$(PWD) modules
clean:
	make -C /lib/modules/$(shell uname -r)/build/ M=$(PWD) clean

detect_YOLO_final.py

Python
# Ultralytics  AGPL-3.0 License - https://ultralytics.com/license

import argparse
import csv
import os
import sys
import time
from pathlib import Path
from typing import Any

import torch
import cv2
import numpy as np

FILE = Path(__file__).resolve()
ROOT = FILE.parents[0]  # YOLOv5 root directory
if str(ROOT) not in sys.path:
    sys.path.append(str(ROOT))  # add ROOT to PATH
ROOT = Path(os.path.relpath(ROOT, Path.cwd()))  # relative

# General YOLO5 setup
from ultralytics.utils.plotting import Annotator, colors, save_one_box
from models.common import DetectMultiBackend
from utils.general import (
    LOGGER,
    Profile,
    check_file,
    check_img_size,
    check_requirements,
    colorstr,
    increment_path,
    non_max_suppression,
    scale_boxes,
    strip_optimizer,
    xyxy2xywh,
)
from utils.torch_utils import select_device, smart_inference_mode


def print_args(args: Any) -> None:
    #Print function arguments with their values.
    LOGGER.info("Arguments and their values:")
    for k, v in sorted(vars(args).items() if hasattr(args, '__dict__') else args.items()):
        LOGGER.info(f"{k}: {v}")


@smart_inference_mode()
def run(
    # All parameters
    weights=ROOT / "yolov5n.pt",  # lightweight model for Raspberry Pi
    source="0",  # default to webcam (Raspberry Pi camera or USB webcam)
    data=ROOT / "",  # dataset
    imgsz=(320, 320),  # inference size (height, width) reduced for performance
    conf_thres=0.25,  # confidence threshold
    exist_ok=False,  # existing project/name ok=
    vid_stride=2,  # video frame-rate stride (skip every 2nd frame)
    max_frames=100,  # number of frames to record
    output_fps=20,  # output video FPS
):
    source = str(source)
    # enable saving inference images
    save_img = not nosave  

    # Directories
    save_dir = increment_path(Path(project) / name, exist_ok=exist_ok)  
    # make dir
    (save_dir / "labels" if save_txt else save_dir).mkdir(parents=True, exist_ok=True)  

    # Load model
    device = select_device(device)  
    model = DetectMultiBackend(weights, device=device, dnn=dnn, data=data, fp16=half)
    stride, names, pt = model.stride, model.names, model.pt
    # check image size
    imgsz = check_img_size(imgsz, s=stride)  

    # Initialize camera
    cap = cv2.VideoCapture(int(source) if source.isnumeric() else source)
    if not cap.isOpened():
        raise ValueError(f"Failed to open camera source: {source}")
    w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS) if cap.get(cv2.CAP_PROP_FPS) > 0 else 30

    # Initialize video writer
    save_path = str(save_dir / "output.mp4")
    vid_writer = cv2.VideoWriter(save_path, cv2.VideoWriter_fourcc(*"mp4v"), output_fps, (w, h))

    # Run inference
    model.warmup(imgsz=(1, 3, *imgsz))  # warmup
    seen, dt = 0, (Profile(device=device), Profile(device=device), Profile(device=device))
    frame_count = 0
    try:
        # Record the max frames specified in argument
        while frame_count < max_frames:
            # Read frame
            for _ in range(vid_stride):  
                ret, img0 = cap.read()

            # Preprocess frame
            with dt[0]:
                img = cv2.resize(img0, imgsz, interpolation=cv2.INTER_LINEAR)
                img = img[:, :, ::-1]  # BGR to RGB
                img = img.transpose((2, 0, 1))  # HWC to CHW
                img = np.ascontiguousarray(img)
                im = torch.from_numpy(img).to(model.device)
                im = im.half() if model.fp16 else im.float()  # uint8 to fp16/32
                im /= 255  # 0 - 255 to 0.0 - 1.0
                if len(im.shape) == 3:
                    im = im[None]  

            # Inference
            with dt[1]:
                pred = model(im, augment=augment, visualize=visualize)

            # Non max suppression
            with dt[2]:
                pred = non_max_suppression(pred, conf_thres, iou_thres, classes, agnostic_nms, max_det=max_det)

            # Process predictions
            seen += 1
            frame_count += 1
            s = f"Frame {frame_count}/{max_frames}: {imgsz[1]}x{imgsz[0]} "
            annotator = Annotator(img0, line_width=line_thickness, example=str(names))
            if len(pred[0]):
                # Rescale boxes from img_size to im0 size
                pred[0][:, :4] = scale_boxes(im.shape[2:], pred[0][:, :4], img0.shape).round()

                # Print results
                for c in pred[0][:, 5].unique():
                    n = (pred[0][:, 5] == c).sum()  # detections per class
                    s += f"{n} {names[int(c)]}{'s' * (n > 1)}, "  

                # Write results
                for *xyxy, conf, cls in reversed(pred[0]):
                    c = int(cls)  # integer class
                    label = None if hide_labels else (names[c] if hide_conf else f"{names[c]} {conf:.2f}")
                    annotator.box_label(xyxy, label, color=colors(c, True))

            # Save results to video
            im0 = annotator.result()
            if save_img:
                vid_writer.write(im0)

            # Print time (inference-only)
            LOGGER.info(f"{s}{'' if len(pred[0]) else '(no detections), '}{dt[1].dt * 1e3:.1f}ms")

    finally:
        LOGGER.info("Starting cleanup...")
        if vid_writer:
            LOGGER.info("Releasing video writer...")
            vid_writer.release()
        if cap:
            LOGGER.info("Releasing camera...")
            cap.release()
        LOGGER.info("Clearing PyTorch memory...")
        if 'im' in locals():
            del im
        if 'pred' in locals():
            del pred
        torch.cuda.empty_cache() if torch.cuda.is_available() else None
        LOGGER.info("Cleanup completed.")

    # Print results
    t = tuple(x.t / seen * 1e3 for x in dt)  # speeds per image
    LOGGER.info(f"Speed: %.1fms pre-process, %.1fms inference, %.1fms NMS per image at shape {(1, 3, *imgsz)}" % t)
    LOGGER.info(f"Video saved to {colorstr('bold', save_path)}")
    if update:
        strip_optimizer(weights[0])  # update model 


def parse_opt():
    #Parse command-line arguments for YOLOv5 detection on Raspberry Pi with video saving.
    parser = argparse.ArgumentParser()
    parser.add_argument("--weights", nargs="+", type=str, default=ROOT / "yolov5n.pt", help="model path or triton URL")
    parser.add_argument("--source", type=str, default="0", help="webcam (0 for Raspberry Pi camera)")
    parser.add_argument("--data", type=str, default=ROOT / "data/coco128.yaml", help="(optional) dataset.yaml path")
    parser.add_argument("--imgsz", "--img", "--img-size", nargs="+", type=int, default=[320], help="inference size h,w")
    parser.add_argument("--conf-thres", type=float, default=0.25, help="confidence threshold")
    parser.add_argument("--exist-ok", action="store_true", help="existing project/name ok, do not increment")
    parser.add_argument("--vid-stride", type=int, default=2, help="video frame-rate stride")
    parser.add_argument("--max-frames", type=int, default=100, help="number of frames to record")
    parser.add_argument("--output-fps", type=int, default=20, help="output video FPS")
    opt = parser.parse_args()
    opt.imgsz *= 2 if len(opt.imgsz) == 1 else 1  # expand
    print_args(vars(opt))
    return opt


def main(opt):
    #Executes YOLOv5 model inference for live camera feed on Raspberry Pi, saving to video.
    check_requirements(ROOT / "requirements.txt", exclude=("tensorboard", "thop"))
    run(**vars(opt))


if __name__ == "__main__":
    opt = parse_opt()
    main(opt)

Credits

Lester Liu
1 project • 0 followers
Anh Pham
1 project • 0 followers
Han Feng
1 project • 0 followers
Elliot Metcalf
1 project • 0 followers

Comments