# Team:Ayush Suresh, Daniel Merklen Pimenta De Medeiros, Hong-Jyun Wang, Kaan Yilmaz, Leo Marek
# Class: ELEC 424
# Final Project
# Fall 2025
#
# Code drawn from:
# https://www.hackster.io/m-e-g-g/the-magnificent-m-e-g-g-car-28ec89#code
import math
import time
import cv2
import matplotlib.pyplot as plt
import numpy as np
import RPi.GPIO as GPIO
# =============================================================================
# CONFIGURATION
# =============================================================================
# --- GPIO pins ---
PIN_ESC = 18 # Drive motor (ESC)
PIN_SERVO = 19 # Steering servo
PWM_FREQ_HZ = 50
# --- ESC / Servo duty cycles (%) ---
ESC_NEUTRAL_DUTY_CYCLE = 7.3
ESC_LOW_DUTY_CYCLE = 7.6
SERVO_NEUTRAL_DUTY_CYCLE = 6.5
SERVO_MIN_DUTY_CYCLE = 4.0
SERVO_MAX_DUTY_CYCLE = 9.0
# --- ESC calibration duty cycles ---
ESC_CAL_FULL_THROTTLE = 2.0 # Full throttle signal during calibration
ESC_CAL_FULL_REVERSE = 1.0 # Full reverse signal during calibration
ESC_CAL_THROTTLE_WAIT = 2 # Seconds to hold throttle signal
ESC_CAL_REVERSE_WAIT = 2 # Seconds to hold reverse signal
ESC_CAL_NEUTRAL_WAIT = 1 # Seconds to hold neutral after calibration
# --- Camera ---
CAM_INDEX = 0
FRAME_WIDTH = 160
FRAME_HEIGHT = 120
# --- PD controller ---
KP = 0.099
KD = KP * 0.42
STEERING_STRAIGHT_DEG = 90
STEERING_MAX_ERROR_DEG = 28
STEERING_ERROR_DEADBAND_DEG = 2
STEERING_SMOOTHING_ALPHA = 0.24
STEERING_MAX_D_TERM = 0.4
STEERING_MAX_STEP_DUTY = 0.14
MIN_CONTROL_DT_SEC = 0.02
# --- Lane detection: blue HSV thresholds ---
LANE_BLUE_HSV_LOWER = np.array([90, 50, 50], dtype="uint8")
LANE_BLUE_HSV_UPPER = np.array([130, 255, 255], dtype="uint8")
BLUE_BLUR_KERNEL_SIZE = 5
MASK_MORPH_KERNEL_SIZE = 3
# --- Lane detection: Canny edge thresholds ---
CANNY_LOW_THRESHOLD = 50
CANNY_HIGH_THRESHOLD = 100
# --- Lane detection: region of interest ---
ROI_VERTICAL_FRACTION = 0.5 # Keep the bottom fraction of the frame
# --- Lane detection: Hough transform ---
HOUGH_RHO = 1
HOUGH_THETA = np.pi / 180
HOUGH_THRESHOLD = 5
HOUGH_MIN_LINE_LEN = 3
HOUGH_MAX_LINE_GAP = 50
MIN_SEGMENT_LENGTH = 12
MIN_SEGMENT_SLOPE = 0.4
LANE_X_OFFSET_DEADBAND_PX = 5
LANE_X_OFFSET_BLEND_ALPHA = 0.35
# --- Lane detection: left/right region split ---
LANE_BOUNDARY_FRACTION = 1 / 4 # Inner boundary as fraction of frame width
# --- Lane detection: degenerate slope guard ---
MIN_SLOPE = 0.1 # Replaces a zero slope to avoid division by zero
# --- Display: lane lines ---
LANE_LINE_COLOR = (0, 255, 0) # Green (BGR)
LANE_LINE_WIDTH = 6
# --- Display: heading line ---
HEADING_LINE_COLOR = (0, 0, 255) # Red (BGR)
HEADING_LINE_WIDTH = 5
# --- Display: overlay blend weight ---
FRAME_OVERLAY_ALPHA = 0.8 # Weight of original frame in addWeighted blends
# --- Stop sign (red floor) detection ---
RED_HSV_LOWER_1 = np.array([0, 40, 60], dtype="uint8")
RED_HSV_UPPER_1 = np.array([10, 255, 255], dtype="uint8")
RED_HSV_LOWER_2 = np.array([170, 40, 60], dtype="uint8")
RED_HSV_UPPER_2 = np.array([180, 255, 255], dtype="uint8")
RED_FLOOR_PCT_THRESHOLD = 18 # Minimum % of stop ROI that must be red
STOP_ROI_TOP_FRACTION = 0.25 # Start stop-marker detection one quarter down the frame
STOP_ROI_BOTTOM_FRACTION = (
0.75 # End stop-marker detection three quarters down the frame
)
# --- Main loop timing and intervals ---
MAX_TICKS = 4000 # Maximum loop iterations before forced stop
STOP_CHECK_INTERVAL = 1 # Check for stop sign every tick
SECOND_STOP_TICK_BUFFER = 100 # Ticks after first stop before watching for second
FIRST_STOP_WAIT_SEC = 3.0 # Pause duration at first stop sign (seconds)
FINAL_STOP_CONFIRM_FRAMES = (
2 # Require this many positive stop frames before final stop
)
STARTUP_WAIT_SEC = 3.0 # Wait after calibration before driving (seconds)
LOOP_DRIVE_WAIT_SEC = 0.023 # Brief drive window each tick (seconds)
SPEED_STEP_DUTY = 0.01 # Ramp low speed in gradually after stops
# --- Debug: print every N ticks to avoid terminal flood ---
DEBUG_PRINT_INTERVAL = 10
# =============================================================================
# HARDWARE SETUP
# =============================================================================
GPIO.setmode(GPIO.BCM)
GPIO.setup(PIN_ESC, GPIO.OUT)
GPIO.setup(PIN_SERVO, GPIO.OUT)
pwm_esc = GPIO.PWM(PIN_ESC, PWM_FREQ_HZ)
pwm_serv = GPIO.PWM(PIN_SERVO, PWM_FREQ_HZ)
pwm_esc.start(ESC_NEUTRAL_DUTY_CYCLE)
pwm_serv.start(SERVO_NEUTRAL_DUTY_CYCLE)
# =============================================================================
# CAR CONTROL
# =============================================================================
def reset_car():
"""Stop the car and center the steering."""
pwm_esc.ChangeDutyCycle(ESC_NEUTRAL_DUTY_CYCLE)
pwm_serv.ChangeDutyCycle(SERVO_NEUTRAL_DUTY_CYCLE)
def start_car():
"""Set the car to base speed with neutral steering."""
print("Car ready")
pwm_esc.ChangeDutyCycle(ESC_LOW_DUTY_CYCLE)
pwm_serv.ChangeDutyCycle(SERVO_NEUTRAL_DUTY_CYCLE)
def set_speed(dc):
"""Set ESC duty cycle to control drive motor speed."""
pwm_esc.ChangeDutyCycle(dc)
def set_turn(dc):
"""Set servo duty cycle to control steering angle."""
pwm_serv.ChangeDutyCycle(dc)
def clamp(value, low, high):
"""Clamp a numeric value between the given bounds."""
return max(low, min(value, high))
def rate_limit(target, current, max_delta):
"""Limit how quickly a control value may change between loop iterations."""
return current + clamp(target - current, -max_delta, max_delta)
def calibrate_esc():
"""Calibrate the ESC (only needed if lacking gray/orange/black wires)."""
print("Calibrating ESC...")
set_speed(ESC_NEUTRAL_DUTY_CYCLE)
wait(2.0)
print("Calibration done")
def wait(wait_time):
"""Busy-wait for the given number of seconds."""
end_time = time.perf_counter() + wait_time
while time.perf_counter() < end_time:
pass
# =============================================================================
# VISION PIPELINE
# =============================================================================
def detect_edges(frame, debug=False):
"""Detect blue lane lines and return a Canny edge image."""
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
mask = cv2.inRange(hsv, LANE_BLUE_HSV_LOWER, LANE_BLUE_HSV_UPPER)
edges = cv2.Canny(mask, CANNY_LOW_THRESHOLD, CANNY_HIGH_THRESHOLD)
if debug:
blue_pixels = np.count_nonzero(mask)
edge_pixels = np.count_nonzero(edges)
print(
f" [detect_edges] blue mask pixels={blue_pixels}, edge pixels={edge_pixels}"
)
return edges
def region_of_interest(edges, debug=False):
"""Mask out the upper portion of the edge image, keeping only the lower fraction."""
height, width = edges.shape
mask = np.zeros_like(edges)
roi_top = int(height * (1 - ROI_VERTICAL_FRACTION))
polygon = np.array(
[
[
(0, height),
(0, roi_top),
(width, roi_top),
(width, height),
]
],
np.int32,
)
cv2.fillPoly(mask, polygon, 255)
roi = cv2.bitwise_and(edges, mask)
if debug:
roi_pixels = np.count_nonzero(roi)
print(
f" [region_of_interest] ROI top y={roi_top}/{height}, pixels in ROI={roi_pixels}"
)
return roi
def detect_line_segments(cropped_edges, debug=False):
"""Run Hough transform to detect line segments in the ROI."""
result = cv2.HoughLinesP(
cropped_edges,
HOUGH_RHO,
HOUGH_THETA,
HOUGH_THRESHOLD,
np.array([]),
minLineLength=HOUGH_MIN_LINE_LEN,
maxLineGap=HOUGH_MAX_LINE_GAP,
)
if debug:
n = len(result) if result is not None else 0
print(f" [detect_line_segments] Hough found {n} raw segments")
if result is not None:
for seg in result:
x1, y1, x2, y2 = seg[0]
slope = (y2 - y1) / (x2 - x1) if x2 != x1 else float("inf")
print(f" segment ({x1},{y1})->({x2},{y2}) slope={slope:.2f}")
return result
def average_slope_intercept(frame, line_segments, debug=False):
lane_lines = []
if line_segments is None:
if debug:
print(" [average_slope_intercept] No segments to process")
return lane_lines
height, width, _ = frame.shape
left_fit, right_fit = [], []
mid = width / 2
for line_segment in line_segments:
for x1, y1, x2, y2 in line_segment:
if x1 == x2:
continue # skip truly vertical lines only
slope = (y2 - y1) / (x2 - x1)
intercept = y1 - slope * x1
x_avg = (x1 + x2) / 2
if x_avg < mid:
left_fit.append((slope, intercept))
else:
right_fit.append((slope, intercept))
if debug:
print(
f" [average_slope_intercept] left_fit={len(left_fit)}, right_fit={len(right_fit)}"
)
if left_fit:
lane_lines.append(make_points(frame, np.average(left_fit, axis=0)))
if right_fit:
lane_lines.append(make_points(frame, np.average(right_fit, axis=0)))
return lane_lines
def make_points(frame, line):
"""Convert a slope/intercept pair into pixel endpoints spanning the lower half."""
height, width, _ = frame.shape
slope, intercept = line
if slope == 0:
slope = MIN_SLOPE
y1 = height
y2 = int(y1 / 2)
x1 = int((y1 - intercept) / slope)
x2 = int((y2 - intercept) / slope)
return [[x1, y1, x2, y2]]
def display_lines(frame, lines, line_color=LANE_LINE_COLOR, line_width=LANE_LINE_WIDTH):
"""Draw detected lane lines on a copy of the frame."""
line_image = np.zeros_like(frame)
if lines is not None:
for line in lines:
for x1, y1, x2, y2 in line:
cv2.line(line_image, (x1, y1), (x2, y2), line_color, line_width)
return cv2.addWeighted(frame, FRAME_OVERLAY_ALPHA, line_image, 1, 1)
def display_heading_line(
frame, steering_angle, line_color=HEADING_LINE_COLOR, line_width=HEADING_LINE_WIDTH
):
"""Draw the heading direction line on the frame."""
heading_image = np.zeros_like(frame)
height, width, _ = frame.shape
angle_rad = steering_angle / 180.0 * math.pi
x1 = int(width / 2)
y1 = height
x2 = int(x1 - height / 2 / math.tan(angle_rad))
y2 = int(height / 2)
cv2.line(heading_image, (x1, y1), (x2, y2), line_color, line_width)
return cv2.addWeighted(frame, FRAME_OVERLAY_ALPHA, heading_image, 1, 1)
def get_steering_angle(frame, lane_lines):
"""Compute the steering angle (in degrees) from detected lane lines."""
height, width, _ = frame.shape
if len(lane_lines) == 2:
_, _, left_x2, _ = lane_lines[0][0]
_, _, right_x2, _ = lane_lines[1][0]
x_offset = (left_x2 + right_x2) / 2 - int(width / 2)
elif len(lane_lines) == 1:
x1, _, x2, _ = lane_lines[0][0]
x_offset = x2 - x1
else:
x_offset = 0
y_offset = int(height / 2)
angle_deg = int(math.atan(x_offset / y_offset) * 180.0 / math.pi)
return angle_deg + STEERING_STRAIGHT_DEG
# =============================================================================
# STOP SIGN DETECTION
# =============================================================================
def isRedFloorVisible(image):
"""
Detect whether a significant portion of the image contains red (stop zone).
Adapted from hackster.io; HSV ranges adjusted for red floor segments.
Returns:
(bool, ndarray, float): detection result, debug feed, and red coverage percentage.
"""
height, width, _ = image.shape
roi_top = int(height * STOP_ROI_TOP_FRACTION)
roi_bottom = int(height * STOP_ROI_BOTTOM_FRACTION)
roi = image[roi_top:roi_bottom, :]
hsv_img = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV)
# Red wraps around 0/180 degrees in HSV, so two ranges are needed
mask1 = cv2.inRange(hsv_img, RED_HSV_LOWER_1, RED_HSV_UPPER_1)
mask2 = cv2.inRange(hsv_img, RED_HSV_LOWER_2, RED_HSV_UPPER_2)
mask = cv2.bitwise_or(mask1, mask2)
kernel = np.ones((3, 3), np.uint8)
mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel)
mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)
pct_detected = np.count_nonzero(mask) * 100 / mask.size
is_stop = pct_detected > RED_FLOOR_PCT_THRESHOLD
annotated = image.copy()
cv2.rectangle(
annotated, (0, roi_top), (width - 1, roi_bottom - 1), (0, 255, 255), 1
)
red_overlay = roi.copy()
red_tint = np.zeros_like(roi)
red_tint[:, :, 2] = 255
tinted_roi = cv2.addWeighted(roi, 0.65, red_tint, 0.35, 0)
red_overlay[mask > 0] = tinted_roi[mask > 0]
annotated[roi_top:roi_bottom, :] = red_overlay
status = "STOP" if is_stop else "CLEAR"
status_color = (0, 0, 255) if is_stop else (0, 255, 0)
cv2.putText(
annotated,
f"{status} red={pct_detected:.1f}% threshold={RED_FLOOR_PCT_THRESHOLD}%",
(5, 18),
cv2.FONT_HERSHEY_SIMPLEX,
0.45,
status_color,
1,
cv2.LINE_AA,
)
mask_bgr = np.zeros_like(image)
mask_bgr[roi_top:roi_bottom, :] = cv2.cvtColor(mask, cv2.COLOR_GRAY2BGR)
masked_color = np.zeros_like(image)
masked_color[roi_top:roi_bottom, :] = cv2.bitwise_and(roi, roi, mask=mask)
debug_view = np.hstack((annotated, mask_bgr, masked_color))
return is_stop, debug_view, pct_detected
# =============================================================================
# PLOTTING
# =============================================================================
def plot_pd(p_vals, d_vals, error, show_img=False):
"""Save a plot of P/D controller values and error over time."""
fig, ax1 = plt.subplots()
t_ax = np.arange(len(p_vals))
ax1.plot(t_ax, p_vals, "-", label="P values")
ax1.plot(t_ax, d_vals, "-", label="D values")
ax2 = ax1.twinx()
ax2.plot(t_ax, error, "--r", label="Error")
ax1.set_xlabel("Frames")
ax1.set_ylabel("PD Value")
ax2.set_ylim(-STEERING_STRAIGHT_DEG, STEERING_STRAIGHT_DEG)
ax2.set_ylabel("Error Value")
plt.title("PD Values over time")
fig.legend()
fig.tight_layout()
plt.savefig("pd_plot.png")
if show_img:
plt.show()
plt.clf()
def plot_pwm(speed_pwms, turn_pwms, error, show_img=False):
"""Save a plot of speed, steering duty cycles, and error over time."""
fig, ax1 = plt.subplots()
t_ax = np.arange(len(speed_pwms))
ax1.plot(t_ax, speed_pwms, "-", label="Speed")
ax1.plot(t_ax, turn_pwms, "-", label="Steering")
ax1.plot(t_ax, error / np.max(error), "--r", label="Error")
ax1.set_xlabel("Frames")
ax1.set_ylabel("Speed and Steer Duty Cycle")
plt.title("Speed and Steer Duty Cycle, and error vs. time")
fig.legend()
plt.savefig("voltage_plot.png")
if show_img:
plt.show()
plt.clf()
# =============================================================================
# MAIN LOOP
# =============================================================================
def main():
last_time = time.perf_counter()
last_error = 0.0
second_stop_tick = 0
final_stop_hits = 0
smoothed_error = 0.0
last_turn_amt = SERVO_NEUTRAL_DUTY_CYCLE
p_vals = [] # Proportional term history
d_vals = [] # Derivative term history
err_vals = [] # Error history
speed_vals = [] # Speed duty cycle history
steer_vals = [] # Steering duty cycle history
video = cv2.VideoCapture(CAM_INDEX)
video.set(cv2.CAP_PROP_FRAME_WIDTH, FRAME_WIDTH)
video.set(cv2.CAP_PROP_FRAME_HEIGHT, FRAME_HEIGHT)
# --- Verify camera opened and print actual resolution ---
if not video.isOpened():
print("ERROR: Could not open camera")
return
actual_w = video.get(cv2.CAP_PROP_FRAME_WIDTH)
actual_h = video.get(cv2.CAP_PROP_FRAME_HEIGHT)
print(
f"Camera opened: requested {FRAME_WIDTH}x{FRAME_HEIGHT}, got {actual_w}x{actual_h}"
)
start_car()
applied_speed = ESC_LOW_DUTY_CYCLE
counter = 0
calibrate_esc()
print("Calibration done, waiting to start")
wait(STARTUP_WAIT_SEC)
print("Done waiting")
passed_first_stop = False
while counter < MAX_TICKS:
# pwm_esc.ChangeDutyCycle(ESC_ZERO_DUTY_CYCLE)
ret, original_frame = video.read()
if not ret or original_frame is None:
print("Camera frame read failed, stopping")
break
# --- Debug: confirm frame shape each time ---
do_debug = 0
if do_debug:
print(f"\n[tick {counter}] frame shape={original_frame.shape}")
frame = cv2.resize(original_frame, (FRAME_WIDTH, FRAME_HEIGHT))
# Vision pipeline — pass debug flag through each stage
edges = detect_edges(frame, debug=do_debug)
roi = region_of_interest(edges, debug=do_debug)
line_segments = detect_line_segments(roi, debug=do_debug)
lane_lines = average_slope_intercept(frame, line_segments, debug=do_debug)
if do_debug:
print(f" [pipeline result] lane_lines found={len(lane_lines)}")
lane_lines_image = display_lines(frame, lane_lines)
steering_angle = get_steering_angle(frame, lane_lines)
heading_image = display_heading_line(lane_lines_image, steering_angle)
is_stop, stop_debug_image, stop_pct = isRedFloorVisible(frame)
if do_debug:
print(f" [stop detection] red={stop_pct:.1f}% stop={is_stop}")
# Debug display
cv2.imshow("ROI edges", roi)
# cv2.imshow("Lane lines", lane_lines_image)
# cv2.imshow("Heading", heading_image)
# cv2.imshow("Stop detection", stop_debug_image)
# PD control
now = time.perf_counter()
dt = max(now - last_time, MIN_CONTROL_DT_SEC)
raw_error = steering_angle - STEERING_STRAIGHT_DEG
error = clamp(raw_error, -STEERING_MAX_ERROR_DEG, STEERING_MAX_ERROR_DEG)
if abs(error) < STEERING_ERROR_DEADBAND_DEG:
error = 0
smoothed_error = (
STEERING_SMOOTHING_ALPHA * error
+ (1 - STEERING_SMOOTHING_ALPHA) * smoothed_error
)
proportional = KP * smoothed_error
derivative = KD * (smoothed_error - last_error) / dt
derivative = clamp(derivative, -STEERING_MAX_D_TERM, STEERING_MAX_D_TERM)
target_turn = clamp(
SERVO_NEUTRAL_DUTY_CYCLE + proportional + derivative,
SERVO_MIN_DUTY_CYCLE,
SERVO_MAX_DUTY_CYCLE,
)
turn_amt = rate_limit(target_turn, last_turn_amt, STEERING_MAX_STEP_DUTY)
pwm_serv.ChangeDutyCycle(turn_amt)
# Stop sign detection
skip_drive_this_tick = False
should_check_stop = counter % STOP_CHECK_INTERVAL == 0
if should_check_stop:
if not passed_first_stop:
if is_stop:
pwm_esc.ChangeDutyCycle(ESC_NEUTRAL_DUTY_CYCLE)
applied_speed = ESC_NEUTRAL_DUTY_CYCLE
wait(FIRST_STOP_WAIT_SEC)
passed_first_stop = True
second_stop_tick = counter
skip_drive_this_tick = True
elif counter > second_stop_tick + SECOND_STOP_TICK_BUFFER:
if is_stop:
final_stop_hits += 1
else:
final_stop_hits = max(final_stop_hits - 1, 0)
if final_stop_hits >= FINAL_STOP_CONFIRM_FRAMES:
pwm_esc.ChangeDutyCycle(ESC_NEUTRAL_DUTY_CYCLE)
applied_speed = ESC_NEUTRAL_DUTY_CYCLE
print("Reached final stop")
break
if skip_drive_this_tick:
applied_speed = ESC_NEUTRAL_DUTY_CYCLE
else:
applied_speed = ESC_LOW_DUTY_CYCLE
p_vals.append(proportional)
d_vals.append(derivative)
err_vals.append(smoothed_error)
speed_vals.append(applied_speed)
steer_vals.append(turn_amt)
if not skip_drive_this_tick:
pwm_esc.ChangeDutyCycle(applied_speed)
wait(LOOP_DRIVE_WAIT_SEC)
if cv2.waitKey(1) == 27: # ESC key to quit
break
last_error = smoothed_error
last_turn_amt = turn_amt
last_time = now
counter += 1
# Shutdown
reset_car()
video.release()
cv2.destroyAllWindows()
plot_pd(p_vals, d_vals, err_vals)
plot_pwm(speed_vals, steer_vals, err_vals)
if __name__ == "__main__":
main()
Comments