"""
ELEC 424/553 Final Project - Self-Driving Car (Raspberry Pi 5)
Team: Team 7
Members: Lola Cantillon, Areli Garcia, Alex Carneiro de Sousa, Santiago Brent
This script performs autonomous lane keeping, red stop-box detection,
and closed-loop speed control on an RC car using a Raspberry Pi 5,
USB webcam, and a custom optical encoder kernel driver.
Citations:
- raja_961, "Autonomous Lane-Keeping Car Using Raspberry Pi and OpenCV."
Instructables. https://www.instructables.com/Autonomous-Lane-Keeping-Car-Using-Raspberry-Pi-and/
- graDudes, "Beaglebone Black (BBB) autonomous vehicle that drives between lines, stop at stop signs and stops at traffic lights."
Hackster.io. URL: https://www.hackster.io/439751/gradudes-9d9a6e
"""
# Standard library imports for timing, file I/O, and signal handling
import time
import csv
import signal
import sys
# OpenCV for computer vision and NumPy for array math
import cv2
import numpy as np
# Hardware PWM driver for the Pi 5 - drives the steering servo and ESC.
# Install with: pip install rpi-hardware-pwm
from rpi_hardware_pwm import HardwarePWM
# ============================================================
# CONFIGURATION CONSTANTS
# ============================================================
# Camera resolution - keep low for real-time processing on the Pi 5.
FRAME_WIDTH = 320
FRAME_HEIGHT = 240
# PWM duty cycle bounds for steering. 7.5% is approximately straight,
# 6% is full left, 9% is full right (per the project guidelines).
STEERING_NEUTRAL = 7.5
STEERING_MIN = 6.0
STEERING_MAX = 9.0
# Throttle PWM bounds. Keep low for safety and to make tuning manageable.
# Note: set_throttle clamps the lower bound to THROTTLE_NEUTRAL, not below.
# That means the speed controller can only coast (toward neutral), never
# brake. Reverse would need a separate ESC arming sequence. If the car
# runs away on a downhill, the controller is saturated at neutral - lower
# THROTTLE_BASE rather than expecting active braking.
THROTTLE_NEUTRAL = 7.5
THROTTLE_BASE = 7.9 # baseline forward speed - TUNE THIS
THROTTLE_MAX = 8.2 # cap so the car never runs away
# PD controller gains for steering. Start with Kd = 0 and small Kp,
# then increase Kp until the car oscillates, then add Kd to dampen.
KP = 0.02
KD = 0.08
# Speed control gain (proportional only). Reads the encoder rate from
# the kernel driver via sysfs and adjusts throttle to hit a target.
SPEED_KP = 0.0001
TARGET_ENCODER_PERIOD_NS = 50_000_000 # ns between encoder edges - TUNE
# Stop box detection. Red is checked every N frames to preserve loop FPS.
# TODO: tune RED_PIXEL_THRESHOLD by printing the count near a stop box.
RED_CHECK_EVERY_N_FRAMES = 3
RED_PIXEL_THRESHOLD = 2000
FIRST_STOP_DURATION_S = 3.0
STOP_COOLDOWN_FRAMES = 60 # ignore red for this many frames after a stop
# Bounding the max amount of loops.
MAX_FRAMES = 3000
# Hardware PWM configuration for the Pi 5.
# - PWM_CHIP=2 is the Pi 5's PWM controller (Pi 4 used chip 0).
# - Channel 0 maps to GPIO 12 (or 18); channel 1 maps to GPIO 13 (or 19).
# - 50 Hz is the standard servo / ESC frame rate.
# Requires `dtoverlay=pwm-2chan` (or equivalent) enabled in
# /boot/firmware/config.txt so both PWM channels are exposed.
PWM_CHIP = 2
STEERING_CHANNEL = 0
THROTTLE_CHANNEL = 1
PWM_FREQ_HZ = 50
# Seconds to hold neutral throttle on startup so the ESC arms cleanly.
ESC_ARM_SECONDS = 2.0
# Path to the sysfs file your encoder driver exposes. Created by the
# kernel module in encoder_driver.c (see that file).
ENCODER_SYSFS_PATH = "/sys/module/op_encoder_driver/parameters/last_interval_ns"
# CSV log path - used to generate the two plots required by the rubric.
LOG_PATH = "run_log.csv"
# ============================================================
# MOTOR CONTROL
# ============================================================
# Uses the rpi_hardware_pwm library, which drives the Pi 5's hardware
# PWM peripheral via /sys/class/pwm. Hardware PWM gives a steady signal
# (no jitter from Python's GIL) which the servo and ESC both need.
class MotorController:
"""Wraps hardware PWM output for the steering servo and ESC throttle."""
def __init__(self):
# Bring up both PWM channels on the Pi 5's PWM controller.
self.steering_pwm = HardwarePWM(
pwm_channel=STEERING_CHANNEL, hz=PWM_FREQ_HZ, chip=PWM_CHIP)
self.throttle_pwm = HardwarePWM(
pwm_channel=THROTTLE_CHANNEL, hz=PWM_FREQ_HZ, chip=PWM_CHIP)
# Track current duty cycle for logging and clamp checks.
self.steering_duty = STEERING_NEUTRAL
self.throttle_duty = THROTTLE_NEUTRAL
# Start both channels at neutral so the car does not lurch.
self.steering_pwm.start(STEERING_NEUTRAL)
self.throttle_pwm.start(THROTTLE_NEUTRAL)
# The following sequence is done for calibration purposes and
# it tells the ESC the range of PWM signals it will receive.
# This was adapted from the gradDudes python file with the
# BeagleBone.
print("let's calibrate the ESC!")
# ESC calibration values from the assignment.
CALIB_HIGH = 10.0 # Max throttle signal for ESC calibration
CALIB_LOW = 5.0 # Min throttle signal for ESC calibration
# Sending maximum throttle signal for two seconds.
self.throttle_pwm.change_duty_cycle(CALIB_HIGH)
self.throttle_duty = CALIB_HIGH
print(f" Sending a high signal ({CALIB_HIGH}%) to the ESC for 2 seconds...")
time.sleep(2)
# Sending minimum throttle signal for two seconds.
self.throttle_pwm.change_duty_cycle(CALIB_LOW)
self.throttle_duty = CALIB_LOW
print(f" Sending a low signal ({CALIB_LOW}%) to the ESC for 2 seconds...")
time.sleep(2)
# Return to neutral, ideally this should cause two beeps to happen.
# First one says the ESC was acknowledged, while the second one says
# it's ready to drive.
self.throttle_pwm.change_duty_cycle(THROTTLE_NEUTRAL)
self.throttle_duty = THROTTLE_NEUTRAL
print(" Setting ESC back to neutral...")
# ESCs need to see a steady neutral pulse for a moment to arm.
time.sleep(ESC_ARM_SECONDS)
def set_steering(self, duty):
# Clamp the requested duty cycle to the safe steering range.
duty = max(STEERING_MIN, min(STEERING_MAX, duty))
self.steering_duty = duty
# Push the new duty cycle to the hardware PWM channel.
self.steering_pwm.change_duty_cycle(duty)
def set_throttle(self, duty):
# Clamp to a safe throttle window so the car cannot run away.
duty = max(THROTTLE_NEUTRAL, min(THROTTLE_MAX, duty))
self.throttle_duty = duty
# Push the new duty cycle to the hardware PWM channel.
self.throttle_pwm.change_duty_cycle(duty)
def neutral(self):
# Always callable - used in the cleanup path and on Ctrl+C.
self.set_steering(STEERING_NEUTRAL)
self.set_throttle(THROTTLE_NEUTRAL)
def cleanup(self):
# Stop the motors first, then release the PWM channels so the
# next run can grab them again without a "device busy" error.
self.neutral()
# Hold neutral briefly so the ESC sees the stop pulse before
# the PWM line goes quiet.
time.sleep(0.2)
self.steering_pwm.stop()
self.throttle_pwm.stop()
# ============================================================
# LANE DETECTION
# ============================================================
# This pipeline follows the raja_961 Instructable closely:
# 1. Crop to lower half of the frame (the lane is on the ground).
# 2. Convert to HSV and mask the blue lane tape.
# 3. Run Canny edge detection, then probabilistic Hough transform.
# 4. Average left and right line slopes/intercepts to estimate
# where the lane center is at the bottom of the frame.
# 5. Compute the heading angle the car should aim for.
def detect_lane_edges(frame):
"""Return a binary edge image of just the blue lane tape."""
# Convert to HSV - hue-based color masking is more robust than RGB.
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
# TODO: confirm these HSV bounds match the blue tape in your room's
# lighting. Print/visualize the mask and tweak if it picks up noise.
lower_blue = np.array([90, 80, 40])
upper_blue = np.array([130, 255, 255])
mask = cv2.inRange(hsv, lower_blue, upper_blue)
# Edges of the masked lane region - input to the Hough transform.
edges = cv2.Canny(mask, 50, 100)
return edges
def region_of_interest(edges):
"""Keep only the lower portion of the frame where the lane lives."""
height, width = edges.shape
mask = np.zeros_like(edges)
# A trapezoid focused on the lower half ignores ceiling/desk noise.
polygon = np.array([[
(0, height),
(0, height * 1 // 2),
(width, height * 1 // 2),
(width, height),
]], np.int32)
cv2.fillPoly(mask, polygon, 255)
return cv2.bitwise_and(edges, mask)
def detect_line_segments(cropped_edges):
"""Hough transform to find candidate lane-line segments."""
# TODO: tune these Hough parameters for your camera/resolution.
rho = 1
angle = np.pi / 180
min_threshold = 10
return cv2.HoughLinesP(
cropped_edges, rho, angle, min_threshold,
np.array([]), minLineLength=8, maxLineGap=4
)
def average_slope_intercept(frame, line_segments):
"""Cluster Hough segments into a single left and right lane line."""
# If no segments at all, we have no lane info this frame.
if line_segments is None:
return []
height, width, _ = frame.shape
left_fit, right_fit = [], []
# Only consider segments whose endpoints fall in the appropriate
# half of the frame (left line on the left, right line on the right).
boundary = 1 / 3
left_region_boundary = width * (1 - boundary)
right_region_boundary = width * boundary
for segment in line_segments:
for x1, y1, x2, y2 in segment:
# Skip vertical lines - slope is undefined.
if x1 == x2:
continue
fit = np.polyfit((x1, x2), (y1, y2), 1)
slope, intercept = fit[0], fit[1]
# Negative slope = left line, positive slope = right line
# (because y axis is flipped in image coordinates).
if slope < 0 and x1 < left_region_boundary and x2 < left_region_boundary:
left_fit.append((slope, intercept))
elif slope > 0 and x1 > right_region_boundary and x2 > right_region_boundary:
right_fit.append((slope, intercept))
# Average the candidate fits and convert back to (x1,y1,x2,y2).
lane_lines = []
if left_fit:
lane_lines.append(make_points(frame, np.average(left_fit, axis=0)))
if right_fit:
lane_lines.append(make_points(frame, np.average(right_fit, axis=0)))
return lane_lines
def make_points(frame, line):
"""Convert (slope, intercept) to two endpoints spanning the ROI."""
height, width, _ = frame.shape
slope, intercept = line
# Bottom of the frame down to the ROI top edge.
y1 = height
y2 = int(y1 * 1 / 2)
# Solve x = (y - b) / m, clamped to the frame width.
x1 = max(-width, min(2 * width, int((y1 - intercept) / slope)))
x2 = max(-width, min(2 * width, int((y2 - intercept) / slope)))
return [[x1, y1, x2, y2]]
def compute_steering_angle(frame, lane_lines):
"""Return the desired heading angle in degrees (90 = straight ahead)."""
# No lanes seen - tell the caller to hold the previous angle.
if len(lane_lines) == 0:
return -1
height, width, _ = frame.shape
if len(lane_lines) == 1:
# Only one line visible - aim parallel to it.
x1, _, x2, _ = lane_lines[0][0]
x_offset = x2 - x1
else:
# Two lines - aim at their midpoint at the top of the ROI.
_, _, left_x2, _ = lane_lines[0][0]
_, _, right_x2, _ = lane_lines[1][0]
mid = int(width / 2)
x_offset = (left_x2 + right_x2) / 2 - mid
y_offset = int(height / 2)
# Convert pixel offset to a steering angle relative to straight-ahead.
angle_to_mid_radian = np.arctan(x_offset / y_offset)
angle_to_mid_deg = int(angle_to_mid_radian * 180.0 / np.pi)
return angle_to_mid_deg + 90
# ============================================================
# DISPLAY FOR DEBUGGING
# ============================================================
def display_lines(frame, lines, line_color=(0, 255, 0), line_width=6):
"""Overlay detected lane lines onto the frame."""
line_image = np.zeros_like(frame)
if lines is not None:
for line in lines:
for x1, y1, x2, y2 in line:
cv2.line(line_image, (x1, y1), (x2, y2), line_color, line_width)
return cv2.addWeighted(frame, 0.8, line_image, 1, 1)
def display_heading_line(frame, steering_angle, line_color=(0, 0, 255), line_width=5):
"""Draw the computed heading direction onto the frame."""
heading_image = np.zeros_like(frame)
height, width, _ = frame.shape
steering_angle_radian = steering_angle / 180.0 * np.pi
x1 = int(width / 2)
y1 = height
x2 = int(x1 - height / 2 / np.tan(steering_angle_radian))
y2 = int(height / 2)
cv2.line(heading_image, (x1, y1), (x2, y2), line_color, line_width)
return cv2.addWeighted(frame, 0.8, heading_image, 1, 1)
# ============================================================
# RED STOP BOX DETECTION
# ============================================================
def detect_red_stop_box(frame):
"""Return True if a sufficiently large red region is in view."""
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
# Red wraps around the hue circle, so we need two ranges and OR them.
mask1 = cv2.inRange(hsv, np.array([0, 120, 70]), np.array([10, 255, 255]))
mask2 = cv2.inRange(hsv, np.array([170, 120, 70]), np.array([180, 255, 255]))
red_mask = cv2.bitwise_or(mask1, mask2)
# Only count red in the lower half - the stop paper is on the ground.
h = red_mask.shape[0]
red_count = cv2.countNonZero(red_mask[h // 2 :, :])
return red_count > RED_PIXEL_THRESHOLD
# ============================================================
# ENCODER (reads from kernel driver via sysfs)
# ============================================================
def read_encoder_period_ns():
"""Return the latest encoder edge-to-edge period in nanoseconds."""
# The kernel module writes the most recent period to this sysfs file.
# If the file is missing or empty, return None to signal "unknown".
try:
with open(ENCODER_SYSFS_PATH, "r") as f:
return int(f.read().strip())
except (OSError, ValueError):
return None
# ============================================================
# MAIN LOOP
# ============================================================
def main():
# Open the webcam. Device index may need to change on your Pi.
cap = cv2.VideoCapture(0)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, FRAME_WIDTH)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, FRAME_HEIGHT)
if not cap.isOpened():
print("ERROR: could not open webcam")
return
# Install a bare-bones SIGINT handler BEFORE constructing MotorController.
# MotorController.__init__ sleeps ~2s while the ESC arms, and a Ctrl+C
# during that window would otherwise skip cleanup entirely. SystemExit
# raised here unwinds into the try/finally below so motors still stop.
signal.signal(signal.SIGINT, lambda *_: sys.exit(0))
# Initialize to None so the finally clause can tell what was set up.
# If MotorController() raises (e.g., PWM overlay missing), we still
# want cap.release() to run.
motors = None
log_file = None
# try/finally guarantees motors.cleanup() runs no matter what raises
# inside the loop - bad webcam frame, OpenCV assertion, sysfs I/O
# error, etc. The rubric warns explicitly: an uncaught exception with
# the ESC still driving is the worst failure mode for this project.
try:
motors = MotorController()
# Once motors exist, swap in a graceful handler that flips a flag
# rather than calling sys.exit(). The main loop sees the flag,
# exits normally, and the finally block runs - which closes the
# CSV log so partial runs are still debuggable.
running = [True]
def request_shutdown(*_):
running[0] = False
signal.signal(signal.SIGINT, request_shutdown)
# PD controller state.
last_error = 0
last_steering_angle = 90 # 90 deg = straight
# Stop-box state machine: count detections, cooldown, final stop.
stop_count = 0
stop_cooldown = 0
stopped_permanently = False
# CSV log for the two required plots.
log_file = open(LOG_PATH, "w", newline="")
log = csv.writer(log_file)
log.writerow([
"frame", "error", "p_response", "d_response",
"steering_duty", "speed_duty", "encoder_period_ns",
])
# The bounded loop is critical - never run unbounded or the ESC
# may keep driving after Ctrl+C (per the project guidelines).
for frame_num in range(MAX_FRAMES):
# Honor a Ctrl+C from the SIGINT handler - exit the loop so
# the finally block runs (motors neutral, log flushed, cam
# released).
if not running[0]:
break
ok, frame = cap.read()
if not ok:
break
# ---------- LANE DETECTION ----------
edges = detect_lane_edges(frame)
roi = region_of_interest(edges)
segments = detect_line_segments(roi)
lane_lines = average_slope_intercept(frame, segments)
new_angle = compute_steering_angle(frame, lane_lines)
# If we lost the lane this frame, hold the last known angle
# rather than snapping the wheel to some default.
if new_angle == -1:
new_angle = last_steering_angle
last_steering_angle = new_angle
# ---------- PD STEERING ----------
# Error is degrees away from straight ahead.
error = new_angle - 90
d_error = error - last_error
p_response = KP * error
d_response = KD * d_error
steering_duty = STEERING_NEUTRAL + p_response + d_response
last_error = error
motors.set_steering(steering_duty)
# ---------- SPEED CONTROL ----------
# Read the encoder rate from your kernel driver and nudge the
# throttle toward the target period. If we have no reading
# yet, just use the baseline throttle.
period_ns = read_encoder_period_ns()
if period_ns and not stopped_permanently:
# Negative err => car too fast => slow down toward neutral.
speed_err = period_ns - TARGET_ENCODER_PERIOD_NS
speed_duty = THROTTLE_BASE + SPEED_KP * speed_err
else:
speed_duty = THROTTLE_BASE
if not stopped_permanently:
motors.set_throttle(speed_duty)
# ---------- STOP BOX ----------
# Only check every Nth frame, and only when not in cooldown.
if (frame_num % RED_CHECK_EVERY_N_FRAMES == 0
and stop_cooldown == 0
and detect_red_stop_box(frame)):
stop_count += 1
motors.set_throttle(THROTTLE_NEUTRAL)
if stop_count >= 2:
# Final stop box - shut down cleanly.
stopped_permanently = True
motors.neutral()
print("Final stop box reached.")
break
# First stop box - pause, then resume with cooldown.
print(f"Stop box #{stop_count} - pausing.")
time.sleep(FIRST_STOP_DURATION_S)
stop_cooldown = STOP_COOLDOWN_FRAMES
if stop_cooldown > 0:
stop_cooldown -= 1
# ---------- LOG + DISPLAY ----------
log.writerow([
frame_num, error, p_response, d_response,
motors.steering_duty, motors.throttle_duty,
period_ns if period_ns else -1,
])
# Show the lane overlay the most useful single view for debugging.
# Comment these out for final timed runs; imshow + X forwarding is slow.
lane_image = display_lines(frame, lane_lines)
heading_image = display_heading_line(lane_image, new_angle)
cv2.imshow("Lane Detection", heading_image)
# cv2.waitKey(1) is required for any imshow windows to update,
# even if you aren't actively displaying anything. Without it
# the OpenCV event loop never ticks.
cv2.waitKey(1)
finally:
# Guaranteed cleanup, in safety order: motors first (the only
# actuator that can hurt anything), then file handles, then cam.
if motors is not None:
motors.cleanup()
if log_file is not None:
log_file.close()
cap.release()
print("Run complete.")
if __name__ == "__main__":
main()
Comments