Hardware components | ||||||
![]() |
| × | 1 | |||
![]() |
| × | 1 | |||
| × | 1 | ||||
| × | 1 | ||||
| × | 1 | ||||
| × | 1 | ||||
Software apps and online services | ||||||
![]() |
|
Project summary:This is Team Skyrover, Our project uses a solely vision-based approach to implement, tune, and test elementary "autonomous" functions on a Remote-Controlled (RC) car platform. We developed an autonomous self-driving car designed to navigate lanes with precision and intelligence guided by feedback from an advanced mounted camera.
Project Inspiration:
The project comes as an inspiration from the following projects also found on hackster:
1) R. (n.d.). Autonomous Lane-Keeping Car Using Raspberry Pi and OpenCV. Retrieved December 7, 2023, from https://www.instructables.com/Autonomous-Lane-Keeping-Car-Using-Raspberry-Pi-and/
2) Big Brains: https://www.hackster.io/big-brains/big-brains-elec-424-final-project-team-11-53c862
3) graDudes: https://www.hackster.io/439751/gradudes-9d9a6e
HardwareSetup:Following picture is the hardware setup for self driving car. Please refer to "Things" Section for the materials used for the project.
PDController:
The resolution of 160x120 was chosen to because it has the best balance between the processing speed and lane detection accuracy to keep the car within the blue lane lines. The proportional gain (kp = 0.08) was set to provide steering adjustments based on the error in lane alignment, and to avoid oversteering on the track's curves. The derivative gain (kd = kp * 0.3) was chosen to stabilize steering, especially during major lane turn.
Stoppaper recognition:
We handled the stop paper (red box) detection using the isRedFloorVisible()
function, which applies a red color mask on the captured frame and checks if the detected red area exceeds a threshold percentage. When the first stop paper is detected, the car stops for five seconds before resuming movement, and when a second red box is detected later, the car stops permanently by breaking out of the main loop. The system ensures reliable detection by only checking for stop signs every 20 ticks to reduce processing load.
Plots:
RC car video
Object recognition
To implement object recognition, the team connected a Logitech webcam to the Raspberry Pi via USB to record a video. We used the YOLOv5n model from Ultralytics because it provides good speed and accuracy, while also being lightweight enough to be able to run on the Pi’s limited hardware. We noted that one frame was processed every two seconds (on average). Processing included detecting objects and annotating them with bounding boxes and class labels using OpenCV. The code initializes the webcam, preprocesses each frame, and runs inference using YOLOv5n. After capturing 100 frames, we compiled them into an.mp4 file and slowed it down for review.
Yolo recognition video
Raja_961:https://www.instructables.com/Autonomous-Lane-Keeping-Car-Using-Raspberry-Pi-and/
Big Brains: https://www.hackster.io/big-brains/big-brains-elec-424-final-project-team-11-53c862
import cv2
import numpy as np
import matplotlib.pyplot as plt
import math
import sys
import time
import os
import RPi.GPIO as GPIO
# Create output directories for images and plots
OUTPUT_DIR = "output_images"
PLOTS_DIR = "plots"
if not os.path.exists(OUTPUT_DIR):
os.makedirs(OUTPUT_DIR)
if not os.path.exists(PLOTS_DIR):
os.makedirs(PLOTS_DIR)
# GPIO setup
GPIO.setmode(GPIO.BCM)
# Motor and servo setup
GPIO.setup(18, GPIO.OUT) # Initializes GPIO 18 for motor
GPIO.setup(19, GPIO.OUT) # Initializes GPIO 19 for steering
pwm_esc = GPIO.PWM(18, 50) # Creates a PWM at GPIO 18 running at 50Hz
pwm_serv = GPIO.PWM(19, 50) # Creates a PWM at GPIO 19 running at 50Hz
pwm_esc.start(7.5) # Puts DC motor on neutral
pwm_serv.start(7.5) # Sets servo motor straight
# Speed encoder setup
ENCODER_PIN = 6 # GPIO pin for the encoder's OUT pin
GPIO.setup(ENCODER_PIN, GPIO.IN)
# Constants for speed calculation
PULSES_PER_REVOLUTION = 20
WHEEL_CIRCUMFERENCE = 0.2
SPEED_CHANGE = 0.001
ENCODER_MAX_SPEED = 5.0
ENCODER_MIN_SPEED = 1.0
# Variables for speed encoder
last_activation_time = 0 # To store the time of the last encoder activation
current_speed_kmh = 0 # Current speed in km/h
speed_vals_kmh = [] # To store actual speed values for plotting
# Callback function to handle encoder activations and calculate speed
def encoder_callback(channel):
global last_activation_time, current_speed_kmh
current_time = time.time()
# Skip the first activation since we need two timestamps
if last_activation_time != 0:
time_between_activations = current_time - last_activation_time
if time_between_activations > 0:
frequency = 1 / time_between_activations
rpm = (frequency / PULSES_PER_REVOLUTION) * 60
speed_mps = (rpm / 60) * WHEEL_CIRCUMFERENCE
current_speed_kmh = speed_mps * 3.6
last_activation_time = current_time
# Add event detection for the encoder
GPIO.add_event_detect(ENCODER_PIN, GPIO.FALLING, callback=encoder_callback, bouncetime=2)
# Camera
frame_width = 160
frame_height = 120
cam_idx = 0
# PD variables
kp = 0.08 # Proportional gain
kd = kp * 0.3 # Derivative gain
# Speed Values
zero_speed = 7.5 # Car is stopped
base_speed = 7.9 # Speed for middle ground
zero_turn = 7.5 # Neutral steering
# Turn threshold for stop-and-turn
turn_threshold = 1.5 # Increased to reduce stops (was 1.2)
turn_wait_time = 0.3 # Wait time for turns
# Max number of loop
max_ticks = 4000
def reset_car():
# Set speed to zero and steering to neutral
pwm_esc.ChangeDutyCycle(7.5)
pwm_serv.ChangeDutyCycle(7.5)
def start_car():
# Set speed to base speed and steering to neutral
print("Car ready")
pwm_esc.ChangeDutyCycle(base_speed)
pwm_serv.ChangeDutyCycle(7.5)
def wait(wait_time):
# Wait for a period of time
start_time = time.perf_counter() # Start
end_time = start_time + wait_time # End
# Loop until finished
while (time.perf_counter() < end_time):
pass
return
def detect_edges(frame):
# Detect the blue lines
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
lower_blue = np.array([90, 50, 50], dtype="uint8")
upper_blue = np.array([130, 255, 255], dtype="uint8")
# Make mask
mask = cv2.inRange(hsv, lower_blue, upper_blue)
# Get edges
edges = cv2.Canny(mask, 50, 100)
return edges
def region_of_interest(edges):
# Find region to look at
height, width = edges.shape
mask = np.zeros_like(edges)
# Only focus lower half of the screen
polygon = np.array([[
(0, height),
(0, height / 2),
(width, height / 2),
(width, height),
]], np.int32)
cv2.fillPoly(mask, polygon, 255)
# Get region to look at
cropped_edges = cv2.bitwise_and(edges, mask)
return cropped_edges
def detect_line_segments(cropped_edges):
# Find line segments
rho = 1
theta = np.pi / 180
min_threshold = 10
# Get lines
line_segments = cv2.HoughLinesP(cropped_edges, rho, theta, min_threshold,
np.array([]), minLineLength=5, maxLineGap=150)
return line_segments
def average_slope_intercept(frame, line_segments):
# Finding slope of the line
lane_lines = []
if line_segments is None:
print("No line segments detected")
return lane_lines
# Set boundaries
height, width, _ = frame.shape
left_fit = []
right_fit = []
boundary = 1 / 3
left_region_boundary = width * (1 - boundary)
right_region_boundary = width * boundary
# Go through line segments and get line of best fit
for line_segment in line_segments:
for x1, y1, x2, y2 in line_segment:
if x1 == x2:
continue
fit = np.polyfit((x1, x2), (y1, y2), 1)
slope = (y2 - y1) / (x2 - x1)
intercept = y1 - (slope * x1)
if slope < 0:
if x1 < left_region_boundary and x2 < left_region_boundary:
left_fit.append((slope, intercept))
else:
if x1 > right_region_boundary and x2 > right_region_boundary:
right_fit.append((slope, intercept))
# Take average line of best fit
left_fit_average = np.average(left_fit, axis=0)
if len(left_fit) > 0:
lane_lines.append(make_points(frame, left_fit_average))
right_fit_average = np.average(right_fit, axis=0)
if len(right_fit) > 0:
lane_lines.append(make_points(frame, right_fit_average))
return lane_lines
def make_points(frame, line):
# For getting visual of lines
height, width, _ = frame.shape
slope, intercept = line
y1 = height # Bottom of the frame
y2 = int(y1 / 2) # Make points from middle of the frame down
if slope == 0:
slope = 0.1
x1 = int((y1 - intercept) / slope)
x2 = int((y2 - intercept) / slope)
return [[x1, y1, x2, y2]]
def display_lines(frame, lines, line_color=(0, 255, 0), line_width=6):
# Get visual of lines
line_image = np.zeros_like(frame)
if lines is not None:
for line in lines:
for x1, y1, x2, y2 in line:
cv2.line(line_image, (x1, y1), (x2, y2), line_color, line_width)
line_image = cv2.addWeighted(frame, 0.8, line_image, 1, 1)
return line_image
def display_heading_line(frame, steering_angle, line_color=(0, 0, 255), line_width=5):
# Show a heading line
heading_image = np.zeros_like(frame)
height, width, _ = frame.shape
steering_angle_radian = steering_angle / 180.0 * math.pi
x1 = int(width / 2)
y1 = height
x2 = int(x1 - height / 2 / math.tan(steering_angle_radian))
y2 = int(height / 2)
cv2.line(heading_image, (x1, y1), (x2, y2), line_color, line_width)
heading_image = cv2.addWeighted(frame, 0.8, heading_image, 1, 1)
return heading_image
def get_steering_angle(frame, lane_lines):
# Get the angle to steer towards
height, width, _ = frame.shape
if len(lane_lines) == 2:
_, _, left_x2, _ = lane_lines[0][0]
_, _, right_x2, _ = lane_lines[1][0]
mid = int(width / 2)
x_offset = (left_x2 + right_x2) / 2 - mid
y_offset = int(height / 2)
elif len(lane_lines) == 1:
x1, _, x2, _ = lane_lines[0][0]
x_offset = x2 - x1
y_offset = int(height / 2)
elif len(lane_lines) == 0:
x_offset = 0
y_offset = int(height / 2)
angle_to_mid_radian = math.atan(x_offset / y_offset)
angle_to_mid_deg = int(angle_to_mid_radian * 180.0 / math.pi)
steering_angle = angle_to_mid_deg + 90
return steering_angle
def manage_speed(curr_speed):
# Adjust the speed based on the speed encoder
global current_speed_kmh
speed_adjustment = 0 # Default: no speed change
if current_speed_kmh > ENCODER_MAX_SPEED: # Too fast
speed_adjustment = -SPEED_CHANGE # Reduce speed
elif current_speed_kmh < ENCODER_MIN_SPEED: # Too slow
speed_adjustment = SPEED_CHANGE # Increase speed
return speed_adjustment
def plot_pd(p_vals, d_vals, error, show_img=False):
# Plot the proportional, derivative, and error
fig, ax1 = plt.subplots()
t_ax = np.arange(len(p_vals))
ax1.plot(t_ax, p_vals, '-', label="P values")
ax1.plot(t_ax, d_vals, '-', label="D values")
ax2 = ax1.twinx()
ax2.plot(t_ax, error, '--r', label="Error")
ax1.set_xlabel("Frames")
ax1.set_ylabel("PD Value")
ax2.set_ylim(-90, 90)
ax2.set_ylabel("Error Value")
plt.title("PD Values over time")
fig.legend()
fig.tight_layout()
plt.savefig(os.path.join(PLOTS_DIR, "pd_plot.png"))
if show_img:
plt.show()
plt.clf()
def plot_pwm(speed_vals, turn_vals, speed_vals_kmh, error, show_img=False):
# Plot the speed, steering, and error
fig, ax1 = plt.subplots()
t_ax = np.arange(len(speed_vals))
ax1.plot(t_ax, speed_vals, '-', label="Speed Duty Cycle")
ax1.plot(t_ax, turn_vals, '-', label="Steering Duty Cycle")
ax2 = ax1.twinx()
ax2.plot(t_ax, error / np.max(error), '--r', label="Error")
ax1.set_xlabel("Frames")
ax1.set_ylabel("Duty Cycle")
ax2.set_ylabel("Percent Error Value")
plt.title("Speed, Steering, and Error vs. Time")
fig.legend()
plt.savefig(os.path.join(PLOTS_DIR, "voltage_plot.png"))
if show_img:
plt.show()
plt.clf()
def isRedFloorVisible(image):
# Detects whether or not the majority of a color on the screen is a particular color
hsv_img = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
cv2.imwrite(os.path.join(OUTPUT_DIR, "redfloor.jpg"), hsv_img)
percentage = 25
lower_red1 = np.array([0, 40, 60], dtype="uint8")
upper_red1 = np.array([10, 255, 255], dtype="uint8")
lower_red2 = np.array([170, 40, 60], dtype="uint8")
upper_red2 = np.array([180, 255, 255], dtype="uint8")
mask1 = cv2.inRange(hsv_img, lower_red1, upper_red1)
mask2 = cv2.inRange(hsv_img, lower_red2, upper_red2)
mask = cv2.bitwise_or(mask1, mask2)
output = cv2.bitwise_and(hsv_img, hsv_img, mask=mask)
cv2.imwrite(os.path.join(OUTPUT_DIR, "redfloormask.jpg"), output)
percentage_detected = np.count_nonzero(mask) * 100 / np.size(mask)
result = percentage < percentage_detected
if result:
print(percentage_detected)
return result, output
def main():
lastTime = 0
lastError = 0
SecondStopTick = 0
# Arrays for making the graphs
p_vals = [] # Proportional
d_vals = [] # Derivative
err_vals = [] # Error
speed_vals = [] # Speed values
steer_vals = [] # Steering values
# Set up video
video = cv2.VideoCapture(cam_idx)
video.set(cv2.CAP_PROP_FRAME_WIDTH, frame_width)
video.set(cv2.CAP_PROP_FRAME_HEIGHT, frame_height)
try:
# Start car
start_car()
curr_speed = base_speed
counter = 0
passedFirstStopSign = False
while counter < max_ticks:
print(f"Counter: {counter}, Speed: {curr_speed}, Actual Speed: {current_speed_kmh:.2f} km/h")
# Manage video
ret, original_frame = video.read()
if not ret:
print("Failed to capture frame")
break
frame = cv2.resize(original_frame, (160, 120))
# Process the frame to determine the desired steering angle
edges = detect_edges(frame)
# Save edges every 10 frames
if counter % 10 == 0:
cv2.imwrite(os.path.join(OUTPUT_DIR, f"edges_{counter}.jpg"), edges)
cv2.imwrite(os.path.join(OUTPUT_DIR, f"frame_{counter}.jpg"), frame) # Save original frame
roi = region_of_interest(edges)
line_segments = detect_line_segments(roi)
lane_lines = average_slope_intercept(frame, line_segments)
lane_lines_image = display_lines(frame, lane_lines)
if counter % 10 == 0:
cv2.imwrite(os.path.join(OUTPUT_DIR, f"lanes_{counter}.jpg"), lane_lines_image) # Save lane lines
steering_angle = get_steering_angle(frame, lane_lines)
heading_image = display_heading_line(lane_lines_image, steering_angle)
# Calculate changes for PD
now = time.time()
dt = now - lastTime if now - lastTime > 0 else 0.001 # Prevent division by zero
deviation = steering_angle - 90
# PD Code
error = deviation
base_turn = 7.5
proportional = kp * error
derivative = kd * (error - lastError) / dt
# Take values for graphs
p_vals.append(proportional)
d_vals.append(derivative)
err_vals.append(error)
speed_vals.append(curr_speed)
speed_vals_kmh.append(current_speed_kmh)
# Determine actual turn to do
turn_amt = base_turn + proportional + derivative
# Clamp turn_amt to the desired range (6 to 9)
turn_amt = max(6.0, min(9.0, turn_amt))
steer_vals.append(turn_amt)
# Print output every 10 frames
if counter % 10 == 0:
print(f"Steering_angle: {steering_angle}, Error: {error}, Turn_amt: {turn_amt}")
# Stop-and-turn logic
if abs(turn_amt - zero_turn) > turn_threshold:
# Significant turn: stop, turn, then resume
print(f"Stopping to turn: turn_amt={turn_amt}, steering_angle={steering_angle}")
#pwm_esc.ChangeDutyCycle(zero_speed)
pwm_esc.ChangeDutyCycle(7.6)
pwm_serv.ChangeDutyCycle(turn_amt)
wait(turn_wait_time)
pwm_esc.ChangeDutyCycle(curr_speed)
else:
# Minor turn: steer while moving
pwm_esc.ChangeDutyCycle(curr_speed)
pwm_serv.ChangeDutyCycle(turn_amt)
# Speed encoding: adjust speed based on encoder readings
if counter % 3 == 0: # Check every 3 ticks
speed_adjustment = manage_speed(curr_speed)
temp_speed = curr_speed + speed_adjustment
temp_speed = max(7.5, min(8.5, temp_speed))
if temp_speed != curr_speed:
curr_speed = temp_speed
pwm_esc.ChangeDutyCycle(curr_speed)
# Stop if sees the red box
if counter % 20 == 0: # Check every 20 ticks to reduce processing
if not passedFirstStopSign:
isStopSign, floorSight = isRedFloorVisible(frame)
if isStopSign:
print("First red box detected! Stopping for 5 seconds...")
pwm_esc.ChangeDutyCycle(zero_speed) # Stop the car
wait(5) # Wait for 5 seconds
print("Resuming after 5 seconds...")
passedFirstStopSign = True
SecondStopTick = counter
pwm_esc.ChangeDutyCycle(curr_speed) # Resume moving
# Stopping permanently if second red box detected
elif passedFirstStopSign and counter > SecondStopTick + 100:
isStopSign, _ = isRedFloorVisible(frame)
if isStopSign:
print("Second red box detected! Stopping permanently...")
pwm_esc.ChangeDutyCycle(zero_speed) # Stop the car
break # Exit the loop to end the script
wait(0.023)
lastTime = now
lastError = error
counter += 1
# Reset car and close everything
reset_car()
video.release()
# Save plots
plot_pd(p_vals, d_vals, err_vals)
plot_pwm(speed_vals, steer_vals, speed_vals_kmh, err_vals)
print("dimensions")
print(speed_vals)
print(steer_vals)
print(err_vals)
if __name__ == "__main__":
main()
#include <linux/module.h>
#include <linux/of_device.h>
#include <linux/kernel.h>
#include <linux/gpio/consumer.h>
#include <linux/platform_device.h>
#include <linux/init.h>
#include <linux/interrupt.h>
#include <linux/hrtimer.h>
#include <linux/ktime.h>
#include <linux/types.h>
static struct gpio_desc *speedometer_gpio; //gpio pin for speed encoder
unsigned int irq_number;
static int value;
// Timer variables
static struct hrtimer my_timer;
static ktime_t interval; //record the interval of light shining through
ktime_t now;
//static int shining;
static ktime_t start_time; //time light starts shining
static ktime_t stop_time; //time light stops shining
static u64 pulse_duration_ns = 0;
enum hrtimer_restart my_timer_callback(struct hrtimer *timer)
{
pr_info("Timer fired!\n");
// Re-arm the timer for periodic behavior
hrtimer_forward_now(timer, interval);
return HRTIMER_RESTART;
}
static irqreturn_t gpio_irq_handler(int irq, void *dev_id)
{
pr_info("Interrupt triggered. ISR called.\n");
value = gpio_get_value(speedometer_gpio);
ktime_t now = ktime_get();
pr_info("Voltage reading: %d \n", value);
if (value == 1) {
start_time = now;
} else {
pulse_duration_ns = ktime_to_ns(ktime_sub(now, start_time));
pr_info("Pulse duration: %llu ns\n", pulse_duration_ns);
}
return IRQ_HANDLED;
}
// probe function
static int speedometer_probe(struct platform_device *pdev)
{
int return_value;
pr_info("Loading speed module...\n");
//GPIO pins init
speedometer_gpio = gpiod_get(&pdev->dev, "speed", GPIOD_IN);
pr_info("GPIO pin init complete\n");
//interrupt init
irq_number = gpiod_to_irq(speedometer_gpio);
return_value = request_irq(irq_number, gpio_irq_handler,IRQF_TRIGGER_RISING | IRQF_TRIGGER_FALLING,, "speedometer_irq", pdev);
if (return_value){
pr_info("Failed IRQ.\n");
return return_value;
}
pr_info("Interrupt init complete\n");
// Timer init
interval = ktime_set(0, 500 * 1000000); // 500ms
hrtimer_init(&my_timer, CLOCK_MONOTONIC, HRTIMER_MODE_REL);
my_timer.function = my_timer_callback;
hrtimer_start(&my_timer, interval, HRTIMER_MODE_REL);
now = ktime_get();
pr_info("Timer init complete\n");
return 0;
}
// remove function
static int speedometer_remove(struct platform_device *pdev)
{
int ret = hrtimer_cancel(&my_timer);
pr_info("Removing speed module...\n");
//free irq
free_irq(irq_number, pdev);
pr_info("IRQ freed successfully!\n");
//free GPIO
gpiod_put(speedometer_gpio);
// Cancel the timer; pr_info is just printk with KERN_INFO
if (ret)
pr_info("Timer was active and has been cancelled\n");
else
pr_info("Timer was not active\n");
return 0;
}
static struct of_device_id matchy_match[] = {
{.compatible = "elec553,mydevice"},
{},
};
// platform driver object
static struct platform_driver speed_encoder = {
.probe = speedometer_probe,
.remove = speedometer_remove,
.driver = {
.name = "The Rock: this name doesn't even matter",
.owner = THIS_MODULE,
.of_match_table = matchy_match,
},
};
module_platform_driver(speed_encoder);
MODULE_DESCRIPTION("424\'s finest");
MODULE_AUTHOR("GOAT");
MODULE_LICENSE("GPL v2");
MODULE_ALIAS("platform:adam_driver");
/dts-v1/;
/plugin/;
/ {
fragment@0 {
target-path = "/";
_overlay_ {
mydevice {
compatible = "elec553,mydevice";
speed-gpios = <&gpio 6 0>;
};
};
};
};
# Modified from:
# Dr. Derek Molloy, School of Electronic Engineering, Dublin City University,
# Ireland. URL: http://derekmolloy.ie/writing-a-linux-kernel-module-part-1-introduction/
PWD=$(shell pwd)
KERNEL_BUILD=/lib/modules/$(shell uname -r)/build
obj-m+=gpiod_driver.o
all:
make -C /lib/modules/$(shell uname -r)/build/ M=$(PWD) modules
clean:
make -C /lib/modules/$(shell uname -r)/build/ M=$(PWD) clean
# Ultralytics AGPL-3.0 License - https://ultralytics.com/license
import argparse
import csv
import os
import sys
import time
from pathlib import Path
from typing import Any
import torch
import cv2
import numpy as np
FILE = Path(__file__).resolve()
ROOT = FILE.parents[0] # YOLOv5 root directory
if str(ROOT) not in sys.path:
sys.path.append(str(ROOT)) # add ROOT to PATH
ROOT = Path(os.path.relpath(ROOT, Path.cwd())) # relative
# General YOLO5 setup
from ultralytics.utils.plotting import Annotator, colors, save_one_box
from models.common import DetectMultiBackend
from utils.general import (
LOGGER,
Profile,
check_file,
check_img_size,
check_requirements,
colorstr,
increment_path,
non_max_suppression,
scale_boxes,
strip_optimizer,
xyxy2xywh,
)
from utils.torch_utils import select_device, smart_inference_mode
def print_args(args: Any) -> None:
#Print function arguments with their values.
LOGGER.info("Arguments and their values:")
for k, v in sorted(vars(args).items() if hasattr(args, '__dict__') else args.items()):
LOGGER.info(f"{k}: {v}")
@smart_inference_mode()
def run(
# All parameters
weights=ROOT / "yolov5n.pt", # lightweight model for Raspberry Pi
source="0", # default to webcam (Raspberry Pi camera or USB webcam)
data=ROOT / "", # dataset
imgsz=(320, 320), # inference size (height, width) reduced for performance
conf_thres=0.25, # confidence threshold
exist_ok=False, # existing project/name ok=
vid_stride=2, # video frame-rate stride (skip every 2nd frame)
max_frames=100, # number of frames to record
output_fps=20, # output video FPS
):
source = str(source)
# enable saving inference images
save_img = not nosave
# Directories
save_dir = increment_path(Path(project) / name, exist_ok=exist_ok)
# make dir
(save_dir / "labels" if save_txt else save_dir).mkdir(parents=True, exist_ok=True)
# Load model
device = select_device(device)
model = DetectMultiBackend(weights, device=device, dnn=dnn, data=data, fp16=half)
stride, names, pt = model.stride, model.names, model.pt
# check image size
imgsz = check_img_size(imgsz, s=stride)
# Initialize camera
cap = cv2.VideoCapture(int(source) if source.isnumeric() else source)
if not cap.isOpened():
raise ValueError(f"Failed to open camera source: {source}")
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS) if cap.get(cv2.CAP_PROP_FPS) > 0 else 30
# Initialize video writer
save_path = str(save_dir / "output.mp4")
vid_writer = cv2.VideoWriter(save_path, cv2.VideoWriter_fourcc(*"mp4v"), output_fps, (w, h))
# Run inference
model.warmup(imgsz=(1, 3, *imgsz)) # warmup
seen, dt = 0, (Profile(device=device), Profile(device=device), Profile(device=device))
frame_count = 0
try:
# Record the max frames specified in argument
while frame_count < max_frames:
# Read frame
for _ in range(vid_stride):
ret, img0 = cap.read()
# Preprocess frame
with dt[0]:
img = cv2.resize(img0, imgsz, interpolation=cv2.INTER_LINEAR)
img = img[:, :, ::-1] # BGR to RGB
img = img.transpose((2, 0, 1)) # HWC to CHW
img = np.ascontiguousarray(img)
im = torch.from_numpy(img).to(model.device)
im = im.half() if model.fp16 else im.float() # uint8 to fp16/32
im /= 255 # 0 - 255 to 0.0 - 1.0
if len(im.shape) == 3:
im = im[None]
# Inference
with dt[1]:
pred = model(im, augment=augment, visualize=visualize)
# Non max suppression
with dt[2]:
pred = non_max_suppression(pred, conf_thres, iou_thres, classes, agnostic_nms, max_det=max_det)
# Process predictions
seen += 1
frame_count += 1
s = f"Frame {frame_count}/{max_frames}: {imgsz[1]}x{imgsz[0]} "
annotator = Annotator(img0, line_width=line_thickness, example=str(names))
if len(pred[0]):
# Rescale boxes from img_size to im0 size
pred[0][:, :4] = scale_boxes(im.shape[2:], pred[0][:, :4], img0.shape).round()
# Print results
for c in pred[0][:, 5].unique():
n = (pred[0][:, 5] == c).sum() # detections per class
s += f"{n} {names[int(c)]}{'s' * (n > 1)}, "
# Write results
for *xyxy, conf, cls in reversed(pred[0]):
c = int(cls) # integer class
label = None if hide_labels else (names[c] if hide_conf else f"{names[c]} {conf:.2f}")
annotator.box_label(xyxy, label, color=colors(c, True))
# Save results to video
im0 = annotator.result()
if save_img:
vid_writer.write(im0)
# Print time (inference-only)
LOGGER.info(f"{s}{'' if len(pred[0]) else '(no detections), '}{dt[1].dt * 1e3:.1f}ms")
finally:
LOGGER.info("Starting cleanup...")
if vid_writer:
LOGGER.info("Releasing video writer...")
vid_writer.release()
if cap:
LOGGER.info("Releasing camera...")
cap.release()
LOGGER.info("Clearing PyTorch memory...")
if 'im' in locals():
del im
if 'pred' in locals():
del pred
torch.cuda.empty_cache() if torch.cuda.is_available() else None
LOGGER.info("Cleanup completed.")
# Print results
t = tuple(x.t / seen * 1e3 for x in dt) # speeds per image
LOGGER.info(f"Speed: %.1fms pre-process, %.1fms inference, %.1fms NMS per image at shape {(1, 3, *imgsz)}" % t)
LOGGER.info(f"Video saved to {colorstr('bold', save_path)}")
if update:
strip_optimizer(weights[0]) # update model
def parse_opt():
#Parse command-line arguments for YOLOv5 detection on Raspberry Pi with video saving.
parser = argparse.ArgumentParser()
parser.add_argument("--weights", nargs="+", type=str, default=ROOT / "yolov5n.pt", help="model path or triton URL")
parser.add_argument("--source", type=str, default="0", help="webcam (0 for Raspberry Pi camera)")
parser.add_argument("--data", type=str, default=ROOT / "data/coco128.yaml", help="(optional) dataset.yaml path")
parser.add_argument("--imgsz", "--img", "--img-size", nargs="+", type=int, default=[320], help="inference size h,w")
parser.add_argument("--conf-thres", type=float, default=0.25, help="confidence threshold")
parser.add_argument("--exist-ok", action="store_true", help="existing project/name ok, do not increment")
parser.add_argument("--vid-stride", type=int, default=2, help="video frame-rate stride")
parser.add_argument("--max-frames", type=int, default=100, help="number of frames to record")
parser.add_argument("--output-fps", type=int, default=20, help="output video FPS")
opt = parser.parse_args()
opt.imgsz *= 2 if len(opt.imgsz) == 1 else 1 # expand
print_args(vars(opt))
return opt
def main(opt):
#Executes YOLOv5 model inference for live camera feed on Raspberry Pi, saving to video.
check_requirements(ROOT / "requirements.txt", exclude=("tensorboard", "thop"))
run(**vars(opt))
if __name__ == "__main__":
opt = parse_opt()
main(opt)
Comments