Hardware components | ||||||
![]() |
| × | 1 | |||
![]() |
| × | 1 | |||
![]() |
| × | 1 | |||
| × | 1 | ||||
| × | 1 | ||||
Software apps and online services | ||||||
![]() |
| |||||
This is an autonomous lane-keeping RC car developed for the final project of ELEC 553. It implements a lane-keeping pipeline to drive between blue lanes, stopping at red paper, and object recognition capabilities using Ultralytics YOLO to detect objects.
We determined the resolution that we went with, being 160x120, by testing out the program for performance, and finding the best suitable resolution for the car to recognize the lanes properly, while performing within an acceptable range. With steering, the region of interest for the car was set to be the bottom 15% of the lane so that it’d focus on the nearest tracks. Past that, the proportional gain value of.06 was obtained by testing and finding the lowest point where the car would correctly react to the lanes turning, to ensure that the car turns at the same slope that the lanes turn. And lastly, for derivative gain, we chose to go with a value of 0, because in our testing, having a derivative value would sometimes lead to the car turning incorrectly, while having the derivative value at 0 allowed for it to turn smoothly.
We handled stop paper detection using HSV color thresholding restricted to the bottom 30% of the camera frame, which helped ensure the car only triggered a stop when the paper was directly in front of it rather than reacting to red objects farther down the track. Once the first stop paper was detected, the car halted by setting the ESC PWM duty cycle to neutral, paused for three seconds, then resumed lane keeping by returning to its driving duty cycle. To handle the two stop boxes on the track, we used a flag to track which stop the car had already reached, causing a temporary pause on the first detection and a permanent stop on the second.
For object recognition, we used the pre-trained YOLOv5 model from Ultralytics, loaded with weights downloaded from their GitHub release. We wrote a program that runs inference on each webcam frame at 320×320 resolution and overlays bounding boxes and labels for detections above a 0.25 confidence threshold. To measure the performance impact, we tracked both per-frame total time and per-inference time, then reported a 30-frame rolling FPS average. On the Pi 5 CPU, the program ran at roughly 3 FPS, with an average of 2.65 FPS after 30 seconds, which was the main limitation of the current setup. A more lightweight object detection model could improve the frame rate, but it may reduce detection accuracy.
Below are plots of speed, steering, and error vs frames and our PD and error vs frame.
Below is a picture of our final vehicle with the hardware on it.
Below is a video of our vehicle successfully following the lane and stopping at red stop papers.
Below is a video of our object detection code running live with webcam and Raspberry Pi setup.
References:
Jocher, G., Qiu, J., & Chaurasia, A. (2023). Ultralytics YOLO (Version 8.0.0) [Computer software]. https://github.com/ultralytics/ultralytics
User raja_961, “Autonomous Lane-Keeping Car Using Raspberry Pi and OpenCV”. Instructables. https://www.instructables.com/Autonomous-Lane-Keeping-Car-Using-Raspberry-Pi-and/
Faith Mulugeta, Michelle Zheng, Yurie Han, Maya Borowicz. “Team ELEC424 Dreamhouse”, (2023). Hackster. Previous Repository we used as help.https://www.hackster.io/team-elec424-dreamhouse/team-elec424-dreamhouse-c5ad62
# ELEC 553 Final Project - Object Recognition Demo
# Standalone YOLO inference on webcam frames, with FPS reporting and video output.
import csv
import time
import warnings
import cv2
from ultralytics import YOLO
warnings.filterwarnings("ignore")
# ----------------------------
# Configuration
# ----------------------------
CAM_INDEX = 0
FRAME_WIDTH = 320
FRAME_HEIGHT = 320
INFERENCE_SIZE = 320
MODEL_PATH = "yolov5su.pt"
OUTPUT_VIDEO = "yolo_output.mp4"
FPS_LOG_CSV = "yolo_fps_log.csv"
CONF_THRESHOLD = 0.25
FPS_PRINT_EVERY = 10
FPS_WINDOW = 30
"""
Function to draw detections, given a
results list from the model.
Returns the number of detections that
are above the confidence threshold.
"""
def draw_detections(frame, results, class_names):
detection_count = 0
# go through each potential detection
for box in results[0].boxes:
# get the confidence
confidence = float(box.conf[0])
# if we're not confident enough about this
# detection, look through the next.
if confidence < CONF_THRESHOLD:
continue
# get the bounding box of the detection
x1, y1, x2, y2 = map(int, box.xyxy[0])
# get the object name associated with this detection.
class_id = int(box.cls[0])
class_name = class_names[class_id]
# now display the name and its confidence
# and increment the detections
label = f"{class_name} {confidence:.2f}"
detection_count += 1
# create a rectangle and add text for this detection
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
# add the label for what the object is, and our confidence.
cv2.putText(
frame,
label,
(x1, max(20, y1 - 8)),
cv2.FONT_HERSHEY_SIMPLEX,
0.45,
(0, 255, 0),
2,
)
# now return the amount of confident
# detections for this current frame.
return detection_count
def main():
print("Loading YOLO model...")
model = YOLO(MODEL_PATH)
print(f"Model loaded: {MODEL_PATH}")
# open the capture.
cap = cv2.VideoCapture(CAM_INDEX)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, FRAME_WIDTH)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, FRAME_HEIGHT)
if not cap.isOpened():
print("ERROR: could not open webcam.")
return
# set up the video feed and writer.
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
writer = cv2.VideoWriter(
OUTPUT_VIDEO,
fourcc,
10.0,
(FRAME_WIDTH, FRAME_HEIGHT),
)
# initialize frame_times,
# a graph of fps times (instant_fps).
frame_times = []
frame_count = 0
start_time = time.perf_counter()
previous_time = start_time
# open the csv log!
with open(FPS_LOG_CSV, "w", newline="") as csv_file:
# set up the csv log.
csv_writer = csv.writer(csv_file)
csv_writer.writerow(["frame", "instant_fps", "rolling_fps", "detections"])
print("YOLO demo running.")
print("Press q in the video window to stop.")
while True:
# get the current camera feed for this frame!
ret, frame = cap.read()
if not ret:
print("WARNING: failed to read frame.")
break
frame = cv2.resize(frame, (FRAME_WIDTH, FRAME_HEIGHT))
# get the time needed for an inference
# to complete.
inference_start = time.perf_counter()
# run the model
results = model(frame, imgsz=INFERENCE_SIZE, verbose=False)
# get time after
inference_end = time.perf_counter()
# with the model's results, display the
# detections with rectangles.
# and get the detection count from the
# function, as well.
detection_count = draw_detections(frame, results, model.names)
# get the total frame time, consisting of
# the time needed to infer, and display.
now = time.perf_counter()
total_frame_time = now - previous_time
previous_time = now
# calculate the fps for both
# inference and total.
instant_fps = 1.0 / total_frame_time if total_frame_time > 0 else 0.0
inference_fps = (
1.0 / (inference_end - inference_start)
if inference_end > inference_start
else 0.0
)
# add this fps to the fps graph!
# if the graph is full, then remove the oldest entry.
frame_times.append(total_frame_time)
if len(frame_times) > FPS_WINDOW:
frame_times.pop(0)
# get the average fps of this current window of frametimes.
rolling_dt = sum(frame_times) / len(frame_times)
rolling_fps = 1.0 / rolling_dt if rolling_dt > 0 else 0.0
# increment the frames we've iterated through
# and write our current frame, current fps, average fps, and detection count
frame_count += 1
csv_writer.writerow(
[
frame_count,
f"{instant_fps:.3f}",
f"{rolling_fps:.3f}",
detection_count,
]
)
# display the FPS stats
cv2.putText(
frame,
f"FPS: {rolling_fps:.2f} | Inference FPS: {inference_fps:.2f}",
(10, 22),
cv2.FONT_HERSHEY_SIMPLEX,
0.45,
(0, 255, 255),
1,
)
# display the number of things detected by YOLO.
cv2.putText(
frame,
f"Detections: {detection_count}",
(10, 42),
cv2.FONT_HERSHEY_SIMPLEX,
0.45,
(0, 255, 255),
1,
)
# display the frame.
writer.write(frame)
cv2.imshow("ELEC 553 YOLO Object Recognition", frame)
# if FPS_PRINT_EVERY frames have passed
# then print the stats in console.
if frame_count % FPS_PRINT_EVERY == 0:
print(
f"frame={frame_count} "
f"rolling_fps={rolling_fps:.2f} "
f"inference_fps={inference_fps:.2f} "
f"detections={detection_count}"
)
# exit key
if cv2.waitKey(1) & 0xFF == ord("q"):
break
# get the time this ran for,
# and the average fps for cv.
# this average fps isn't like rolling
# and gets the TOTAL average, rather than
# what's currently on graph.
total_time = time.perf_counter() - start_time
average_fps = frame_count / total_time if total_time > 0 else 0.0
# end the video feed for cv.
# along with camera
cap.release()
writer.release()
cv2.destroyAllWindows()
print("YOLO demo finished.")
print(f"Frames processed: {frame_count}")
print(f"Total runtime: {total_time:.2f} seconds")
print(f"Average FPS: {average_fps:.2f}")
print(f"Saved annotated video: {OUTPUT_VIDEO}")
print(f"Saved FPS log: {FPS_LOG_CSV}")
# cv run start.
if __name__ == "__main__":
main()
# ELEC 553 Final Project
# Self-driving RC car: lane keeping, red stop box detection, and encoder-based speed control.
# References:
# raja_961, "Autonomous Lane-Keeping Car Using Raspberry Pi and OpenCV"
# https://www.instructables.com/Autonomous-Lane-Keeping-Car-Using-Raspberry-Pi-and/
# prior 424 project on Hackster: https://www.hackster.io/team-6/lane-keeping-rc-car-ebdd08
import math
import os
import time
import cv2
import matplotlib.pyplot as plt
import numpy as np
import RPi.GPIO as GPIO
# ============================================================
# GPIO / PWM setup constants
# ============================================================
# GPIO pins for ESC (drive motor) and steering servo
ESC_PIN = 18
SERVO_PIN = 19
# Both ESC and servo expect a 50 Hz PWM signal (20 ms period)
PWM_FREQ = 50
# 7.5% duty cycle is neutral for both the ESC and the servo
ZERO_SPEED = 7.5
STRAIGHT_STEER = 7.5
# ============================================================
# Tuning variables for our specific car
# ============================================================
# Speed values found through floor testing car only moves after 7.91
BASE_SPEED = 7.91
MAX_SPEED = 7.944
MIN_SPEED = 7.9
# Lane-detection ROI only look at the bottom 15% of the frame
ROI_RATIO = 0.85
# Step size used by the encoder feedback loop
SPEED_CHANGE = 0.001
# ============================================================
# Steering tuning (PD controller)
# ============================================================
# Steering centerpoint
BASE_TURN = STRAIGHT_STEER
# Proportional and derivative gains
KP = 0.06
KD = 0.01
# Hard limits on the servo command ESC mechanical range is 6%-9%
STEER_MIN = 6.0
STEER_MAX = 9.0
# ============================================================
# Camera setup
# ============================================================
# Lower resolution to keep OpenCV processing fast on Pi
FRAME_WIDTH = 160
FRAME_HEIGHT = 120
CAM_IDX = 0
# ============================================================
# Encoder setup (sysfs paths exposed by the kernel driver)
# ============================================================
ENCODER_PERIOD_PATH = "/sys/module/encoder/parameters/encoder_period_ns"
ENCODER_COUNT_PATH = "/sys/module/encoder/parameters/encoder_count"
# Target encoder period band (ns between pulses)
# Smaller period = faster wheel; larger period = slower wheel
TARGET_PERIOD_MIN = 8_000_000
TARGET_PERIOD_MAX = 40_000_000
# How long since the last pulse before we treat the wheel as stopped
ENCODER_STALE_S = 0.3
# ============================================================
# Red stop box setup
# ============================================================
# Percentage of red pixels in the ROI required to count as a stop box
RED_THRESHOLD_PCT = 8.0
# How long to pause at the first stop box
STOP_PAUSE_S = 3.0
# Frames to ignore red detection after the first stop, so we don't re-trigger
SECOND_STOP_IGNORE_TICKS = 100
# ============================================================
# Loop setup
# ============================================================
# Hard cap on loop iterations so the motor can't run forever on a crash
MAX_TICKS = 4000
# Approx 50 Hz loop rate target
RUN_DELAY_S = 0.023
# ============================================================
# Module-level state
# ============================================================
# Global PWM handles set up in setup_gpio()
pwm_esc = None
pwm_serv = None
# Encoder freshness tracking
last_encoder_count = 0
last_encoder_time = 0.0
# Stored encoder values for debug / inspection
global_enc_vals = []
# ============================================================
# Motor helpers
# ============================================================
def setup_gpio():
# Initialize GPIO and start both PWM channels at neutral.
global pwm_esc, pwm_serv
GPIO.setmode(GPIO.BCM)
GPIO.setup(ESC_PIN, GPIO.OUT)
GPIO.setup(SERVO_PIN, GPIO.OUT)
# Create PWM channels at 50 Hz on both pins
pwm_esc = GPIO.PWM(ESC_PIN, PWM_FREQ)
pwm_serv = GPIO.PWM(SERVO_PIN, PWM_FREQ)
# Start both at neutral so the ESC arms cleanly
pwm_esc.start(ZERO_SPEED)
pwm_serv.start(STRAIGHT_STEER)
# Give the ESC a couple seconds to recognize neutral and arm
time.sleep(2)
print("GPIO/PWM initialized.")
def reset_car():
# Set the speed motor to neutral and the steering straight.
if pwm_esc is not None:
pwm_esc.ChangeDutyCycle(ZERO_SPEED)
if pwm_serv is not None:
pwm_serv.ChangeDutyCycle(STRAIGHT_STEER)
print("Car stopped and steering straight.")
def start_car():
# Bring the car up to base speed once the user has confirmed the start.
print("Car initialized: starting drive.")
pwm_serv.ChangeDutyCycle(STRAIGHT_STEER)
pwm_esc.ChangeDutyCycle(BASE_SPEED)
def cleanup_gpio():
# Always called on exit so the motor doesn't keep driving.
global pwm_esc, pwm_serv
try:
# First put outputs back to neutral
reset_car()
time.sleep(0.3)
# Then stop both PWM channels
if pwm_esc is not None:
pwm_esc.stop()
if pwm_serv is not None:
pwm_serv.stop()
except Exception as e:
print("cleanup warning:", e)
# Drop references so __del__ doesn't fire on already-cleaned objects
pwm_esc = None
pwm_serv = None
GPIO.cleanup()
print("GPIO cleaned up.")
def clamp(value, low, high):
# Keep a value inside a safe range.
return max(low, min(high, value))
# ============================================================
# Encoder speed control
# ============================================================
def read_encoder_period_ns():
# Read latest pulse-to-pulse period (ns) from the kernel driver.
# Returns None if the wheel has stopped (count hasn't changed recently).
global last_encoder_count, last_encoder_time
now = time.time()
# Driver might not be loaded bail out quietly
if not (os.path.exists(ENCODER_PERIOD_PATH) and os.path.exists(ENCODER_COUNT_PATH)):
return None
# Read both sysfs files (period and pulse count)
try:
with open(ENCODER_PERIOD_PATH, "r") as f:
period = int(f.read().strip())
with open(ENCODER_COUNT_PATH, "r") as f:
count = int(f.read().strip())
except (OSError, ValueError):
return None
# If pulse count has gone up, the wheel is still spinning
if count != last_encoder_count:
last_encoder_count = count
last_encoder_time = now
# If we haven't seen a fresh pulse in a while, treat as stopped
if now - last_encoder_time > ENCODER_STALE_S:
return None
return period if period > 0 else None
def manage_speed(curr_speed):
# Bang-bang controller: nudge throttle up or down based on encoder period.
period = read_encoder_period_ns()
if period is None:
# No encoder reading nudge forward assumed stopped or stalled
reason = "NO_ENCODER"
new_speed = curr_speed + 0.0005
elif period < TARGET_PERIOD_MIN:
# Wheel spinning too fast back off
reason = "TOO_FAST"
new_speed = curr_speed - SPEED_CHANGE
elif period > TARGET_PERIOD_MAX:
# Wheel spinning too slow push harder
reason = "TOO_SLOW"
new_speed = curr_speed + SPEED_CHANGE
else:
# Within target band leave throttle alone
reason = "GOOD"
new_speed = curr_speed
# Clamp to safe range no matter what the controller wants
new_speed = clamp(new_speed, MIN_SPEED, MAX_SPEED)
global_enc_vals.append(period if period is not None else 0)
print(
f"speed_ctrl period={period} reason={reason} speed={curr_speed:.3f}->{new_speed:.3f}"
)
return new_speed
# ============================================================
# Vision: lane detection
# ============================================================
def detect_edges(frame):
# Detect blue lane tape using HSV thresholding then Canny edges.
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
# Looser saturation/value mins so shadowed blue tape still gets caught
lower_blue = np.array([90, 30, 30], dtype="uint8")
upper_blue = np.array([130, 255, 255], dtype="uint8")
# Build the blue mask, then run edge detection on it
mask = cv2.inRange(hsv, lower_blue, upper_blue)
edges = cv2.Canny(mask, 50, 100)
return edges
def region_of_interest(edges, roi_ratio=ROI_RATIO):
# Mask out everything but the bottom slice of the frame.
h, w = edges.shape
mask = np.zeros_like(edges)
# Polygon covers the lower (1 - roi_ratio) of the frame
top_y = int(h * roi_ratio)
polygon = np.array(
[
[
(0, h),
(0, top_y),
(w, top_y),
(w, h),
]
],
np.int32,
)
cv2.fillPoly(mask, polygon, 255)
return cv2.bitwise_and(edges, mask)
def detect_line_segments(cropped_edges):
# Probabilistic Hough transform returns a list of [x1,y1,x2,y2] line segments.
return cv2.HoughLinesP(
cropped_edges,
rho=1,
theta=np.pi / 180,
threshold=10,
lines=np.array([]),
minLineLength=5,
maxLineGap=150,
)
def average_slope_intercept(frame, line_segments):
# Group line segments into left/right based on slope, then average each group.
lane_lines = []
if line_segments is None:
return lane_lines
height, width, _ = frame.shape
left_fit = []
right_fit = []
# Only accept segments in the outer third of their respective side
boundary = 1 / 3
left_region_boundary = width * (1 - boundary)
right_region_boundary = width * boundary
# Iterate over every detected segment
for line_segment in line_segments:
for x1, y1, x2, y2 in line_segment:
# Skip vertical lines (would divide by zero)
if x1 == x2:
continue
# Compute slope and intercept for this segment
slope = (y2 - y1) / (x2 - x1)
intercept = y1 - (slope * x1)
# Negative slope = left lane (in image coords with y growing down)
if slope < 0:
if x1 < left_region_boundary and x2 < left_region_boundary:
left_fit.append((slope, intercept))
else:
# Positive slope = right lane
if x1 > right_region_boundary and x2 > right_region_boundary:
right_fit.append((slope, intercept))
# Average each side's segments into a single representative line
if len(left_fit) > 0:
lane_lines.append(make_points(frame, np.average(left_fit, axis=0)))
if len(right_fit) > 0:
lane_lines.append(make_points(frame, np.average(right_fit, axis=0)))
return lane_lines
def make_points(frame, line):
# Convert a (slope, intercept) pair into two endpoint coords for drawing.
height, width, _ = frame.shape
slope, intercept = line
# Bottom of the frame down to halfway up
y1 = height
y2 = int(y1 / 2)
# Avoid division by zero on near-horizontal lines
if abs(slope) < 0.1:
slope = 0.1 if slope >= 0 else -0.1
# Solve for x given y at each end
x1 = int((y1 - intercept) / slope)
x2 = int((y2 - intercept) / slope)
return [[x1, y1, x2, y2]]
def display_lines(frame, lines, line_color=(0, 255, 0), line_width=6):
# Draw averaged lane lines onto a copy of the frame.
line_image = np.zeros_like(frame)
if lines is not None:
for line in lines:
for x1, y1, x2, y2 in line:
cv2.line(
line_image,
(int(x1), int(y1)),
(int(x2), int(y2)),
line_color,
line_width,
)
# Blend the line overlay with the original frame
return cv2.addWeighted(frame, 0.8, line_image, 1, 1)
def display_heading_line(frame, steering_angle, line_color=(0, 0, 255), line_width=5):
# Draw the red intended-heading line for visualization.
heading_image = np.zeros_like(frame)
height, width, _ = frame.shape
# Convert the steering angle to a line going up from the bottom-center
steering_angle_radian = steering_angle / 180.0 * math.pi
x1 = int(width / 2)
y1 = height
# Guard against tan blowups when angle is exactly 90 deg
tan_val = math.tan(steering_angle_radian)
if abs(tan_val) < 1e-3:
x2 = x1
else:
x2 = int(x1 - height / 2 / tan_val)
y2 = int(height / 2)
cv2.line(heading_image, (x1, y1), (x2, y2), line_color, line_width)
return cv2.addWeighted(frame, 0.8, heading_image, 1, 1)
def get_steering_angle(frame, lane_lines):
# Convert detected lane line positions into a steering angle around 90 deg.
height, width, _ = frame.shape
mid = int(width / 2)
# Both lanes detected steer toward the average of their top points
if len(lane_lines) >= 2:
_, _, left_x2, _ = lane_lines[0][0]
_, _, right_x2, _ = lane_lines[1][0]
x_offset = (left_x2 + right_x2) / 2 - mid
y_offset = int(height * 0.3)
# Only one lane use the line's slope direction directly
elif len(lane_lines) == 1:
x1, _, x2, _ = lane_lines[0][0]
x_offset = x2 - x1
y_offset = int(height * 0.5)
# No lanes go straight
else:
x_offset = 0
y_offset = int(height * 0.75)
# Convert offset into degrees off-straight, then shift to 90-centered
angle_to_mid_radian = math.atan(x_offset / y_offset)
angle_to_mid_deg = int(angle_to_mid_radian * 180.0 / math.pi)
steering_angle = angle_to_mid_deg + 90
return steering_angle
# ============================================================
# Vision: red stop box detection
# ============================================================
def isRedFloorVisible(image):
# Return (is_stop, mask, percentage) for red detection in the lower frame.
h, w = image.shape[:2]
# Only look at the bottom 30% of the frame stop paper is on the ground
lower_img = image[int(h * 0.7) :, :]
hsv_img = cv2.cvtColor(lower_img, cv2.COLOR_BGR2HSV)
# Red wraps around the hue circle, so we need two ranges
lower_red1 = np.array([0, 40, 60], dtype="uint8")
upper_red1 = np.array([10, 255, 255], dtype="uint8")
lower_red2 = np.array([170, 40, 60], dtype="uint8")
upper_red2 = np.array([180, 255, 255], dtype="uint8")
# Build the two masks and combine them
mask1 = cv2.inRange(hsv_img, lower_red1, upper_red1)
mask2 = cv2.inRange(hsv_img, lower_red2, upper_red2)
mask = cv2.bitwise_or(mask1, mask2)
# Compute the percentage of red pixels and check the threshold
percentage_detected = np.count_nonzero(mask) * 100.0 / mask.size
result = percentage_detected > RED_THRESHOLD_PCT
if result:
print(f"Red detected: {percentage_detected:.1f}%")
return result, mask, percentage_detected
# ============================================================
# Plot helpers
# ============================================================
def plot_pd(p_vals, d_vals, error, show_img=False):
# Required plot 1: P response, D response, and error vs frame number.
plt.figure()
t_ax = np.arange(len(p_vals))
plt.plot(t_ax, p_vals, "-", label="Proportional response")
plt.plot(t_ax, d_vals, "-", label="Derivative response")
plt.plot(t_ax, error, "--", label="Error")
plt.xlabel("Frames")
plt.legend()
plt.title("PD and Error Value with Time")
plt.savefig("PDError.png")
if show_img:
plt.show()
plt.clf()
def plot_pwm(speed_pwms, turn_pwms, error, show_img=False):
# Required plot 2: speed duty, steering duty, and error vs frame number.
plt.figure()
t_ax = np.arange(len(speed_pwms))
plt.plot(t_ax, speed_pwms, "-", label="Speed duty cycle")
plt.plot(t_ax, turn_pwms, "-", label="Steering duty cycle")
plt.plot(t_ax, error, "--", label="Error")
plt.xlabel("Frames")
plt.legend()
plt.title("Speed, Steering, and Error with Time")
plt.savefig("SpeedSteeringError.png")
if show_img:
plt.show()
plt.clf()
def save_csv(frames, errors, speeds, steers, p_vals, d_vals):
# Save raw values so plots can be regenerated later if needed.
if len(frames) == 0:
return
# Pack columns and write a header for clarity
data = np.column_stack([frames, errors, speeds, steers, p_vals, d_vals])
np.savetxt(
"run_log.csv",
data,
delimiter=",",
header="frame,error,speed_duty,steering_duty,p_response,d_response",
comments="",
)
# ============================================================
# Frame processing used in both preview and driving loops
# ============================================================
def process_frame(original_frame):
# Resize once and run the full vision pipeline on a single frame.
frame = cv2.resize(original_frame, (FRAME_WIDTH, FRAME_HEIGHT))
# Lane detection pipeline: edges -> ROI -> Hough -> averaging
edges = detect_edges(frame)
roi = region_of_interest(edges, roi_ratio=ROI_RATIO)
line_segments = detect_line_segments(roi)
lane_lines = average_slope_intercept(frame, line_segments)
lane_lines_image = display_lines(frame, lane_lines)
# Compute steering angle and overlay the heading line
steering_angle = get_steering_angle(frame, lane_lines)
heading_image = display_heading_line(lane_lines_image, steering_angle)
# Red stop box check on the same frame
is_stop, red_mask, red_pct = isRedFloorVisible(frame)
# Error in degrees from straight (90)
error = steering_angle - 90
return (
frame,
edges,
lane_lines,
steering_angle,
heading_image,
is_stop,
red_pct,
error,
)
# ============================================================
# Main program
# ============================================================
def main():
# Local state for the run
last_time = time.time()
last_error = 0
second_stop_tick = 0
passed_first_stop_sign = False
# Buffers for the required plots and the CSV log
frames = []
p_vals = []
d_vals = []
err_vals = []
speed_vals = []
steer_vals = []
# Open the camera
video = cv2.VideoCapture(CAM_IDX, cv2.CAP_V4L2)
video.set(cv2.CAP_PROP_FRAME_WIDTH, FRAME_WIDTH)
video.set(cv2.CAP_PROP_FRAME_HEIGHT, FRAME_HEIGHT)
if not video.isOpened():
print("ERROR: camera could not open")
return
# ------------------------------------------------------------
# Preview loop: hold motor at neutral, show camera feed,
# wait for user to press 's' to start driving.
# ------------------------------------------------------------
print("Preview mode. Press 's' in the OpenCV window to start.")
print("Press 'q' in the OpenCV window to quit.")
reset_car()
preview_counter = 0
while True:
# Grab a frame and process it (no motor commands here)
ret, original_frame = video.read()
if not ret:
print("No frame in preview")
continue
_, edges, lane_lines, steering_angle, heading_image, _, red_pct, error = (
process_frame(original_frame)
)
preview_counter += 1
# Overlay debug info on the frame
cv2.putText(
heading_image,
f"PREVIEW err={error:+d} angle={steering_angle} lanes={len(lane_lines)} red={red_pct:.1f}% | s=start q=quit",
(5, 15),
cv2.FONT_HERSHEY_SIMPLEX,
0.35,
(255, 255, 255),
1,
)
# Show both the heading image and the raw edges
cv2.imshow("self_drive", heading_image)
cv2.imshow("edges", edges)
# Periodic terminal log so we can see the controller is alive
if preview_counter % 10 == 0:
print(
f"PREVIEW f={preview_counter} err={error:+d} angle={steering_angle} lanes={len(lane_lines)} red={red_pct:.1f}%"
)
# Check for keypress to start or quit
key = cv2.waitKey(1) & 0xFF
if key == ord("s"):
print("Starting car.")
break
if key == ord("q"):
print("Quit before start.")
return
# ------------------------------------------------------------
# Driving loop: car runs the track until done or MAX_TICKS hits
# ------------------------------------------------------------
curr_speed = BASE_SPEED
counter = 0
start_car()
last_time = time.time()
while counter < MAX_TICKS:
# Grab and process the next frame
ret, original_frame = video.read()
if not ret:
print("No frame while driving")
continue
_, edges, lane_lines, steering_angle, heading_image, is_stop, red_pct, error = (
process_frame(original_frame)
)
# --- PD steering control ---
now = time.time()
dt = now - last_time
last_time = now
if dt <= 0:
dt = 1e-3
# Compute P and D contributions
proportional = KP * error
derivative = KD * (error - last_error) / dt
last_error = error
# Combine and clamp to mechanical range
turn_amt = BASE_TURN + proportional + derivative
turn_amt = clamp(turn_amt, STEER_MIN, STEER_MAX)
pwm_serv.ChangeDutyCycle(turn_amt)
# Append values for the required plots
frames.append(counter)
p_vals.append(proportional)
d_vals.append(derivative)
err_vals.append(error)
speed_vals.append(curr_speed)
steer_vals.append(turn_amt)
# --- Stop box state machine (checked every 10 ticks) ---
if counter % 10 == 0:
if not passed_first_stop_sign:
# Looking for the first stop box
if is_stop:
print("FIRST STOP BOX detected. Pausing.")
pwm_esc.ChangeDutyCycle(ZERO_SPEED)
time.sleep(STOP_PAUSE_S)
passed_first_stop_sign = True
second_stop_tick = counter
last_time = time.time()
elif counter > second_stop_tick + SECOND_STOP_IGNORE_TICKS:
# Cooldown over looking for the second/final stop box
if is_stop:
print("SECOND STOP BOX detected. Final stop.")
pwm_esc.ChangeDutyCycle(ZERO_SPEED)
break
# --- Encoder-based speed control (every 3 ticks) ---
if counter % 3 == 0:
curr_speed = manage_speed(curr_speed)
# In sharp turns, drop to minimum throttle so the servo has more authority
if abs(error) > 15:
applied_speed = MIN_SPEED
else:
applied_speed = curr_speed
# Apply the speed to the ESC
pwm_esc.ChangeDutyCycle(applied_speed)
# --- Debug overlays ---
cv2.putText(
heading_image,
f"DRIVE err={error:+d} steer={turn_amt:.2f} speed={curr_speed:.3f} red={red_pct:.1f}%",
(5, 15),
cv2.FONT_HERSHEY_SIMPLEX,
0.35,
(255, 255, 255),
1,
)
cv2.imshow("self_drive", heading_image)
cv2.imshow("edges", edges)
# Terminal print every 5 frames so the operator can see what's happening
if counter % 5 == 0:
period = read_encoder_period_ns()
print(
f"DRIVE f={counter} err={error:+d} P={proportional:+.2f} "
f"D={derivative:+.2f} steer={turn_amt:.2f} speed={curr_speed:.3f} "
f"lanes={len(lane_lines)} red={red_pct:.1f}% period={period}"
)
# Allow a manual quit via 'q' or Escape
key = cv2.waitKey(1) & 0xFF
if key == ord("q") or key == 27:
print("User quit.")
break
counter += 1
time.sleep(RUN_DELAY_S)
# ------------------------------------------------------------
# Shutdown
# ------------------------------------------------------------
reset_car()
video.release()
cv2.destroyAllWindows()
# Write out plots and raw CSV log
plot_pd(p_vals, d_vals, err_vals)
plot_pwm(speed_vals, steer_vals, err_vals)
save_csv(frames, err_vals, speed_vals, steer_vals, p_vals, d_vals)
print("Saved PDError.png, SpeedSteeringError.png, and run_log.csv")
# ============================================================
# Entry point wrap main in try/finally so cleanup always runs
# ============================================================
if __name__ == "__main__":
try:
setup_gpio()
main()
finally:
cleanup_gpio()
// Optical speed encoder kernel driver
// ELEC 553 Final Project
// Reference: prior 424 project on Hackster
// https://www.hackster.io/team-6/lane-keeping-rc-car-ebdd08
#include <linux/module.h>
#include <linux/of_device.h>
#include <linux/kernel.h>
#include <linux/gpio/consumer.h>
#include <linux/platform_device.h>
#include <linux/interrupt.h>
#include <linux/timekeeping.h>
#include <linux/ktime.h>
#include <linux/moduleparam.h>
#include <linux/of.h>
// irq number for encoder interrupt
static int irq_number;
// gpio descriptor for encoder OUT pin
static struct gpio_desc *my_enc;
// timestamp of last accepted pulse (ns)
static u64 prev_time_ns;
// most recent pulse-to-pulse period (ns), exposed via sysfs
static unsigned long encoder_period_ns;
module_param(encoder_period_ns, ulong, 0444);
MODULE_PARM_DESC(encoder_period_ns, "Latest encoder pulse-to-pulse period in ns");
// total accepted pulses, exposed via sysfs (lets userspace detect a stopped wheel)
static unsigned long encoder_count;
module_param(encoder_count, ulong, 0444);
MODULE_PARM_DESC(encoder_count, "Number of accepted encoder pulses");
// minimum interval between accepted pulses (debounce threshold)
static unsigned long debounce_ns = 1000000; // 1 ms
module_param(debounce_ns, ulong, 0644);
MODULE_PARM_DESC(debounce_ns, "Minimum valid time between pulses in ns");
// interrupt service routine fires on rising edge of encoder pin
static irqreturn_t gpio_irq_handler(int irq, void *dev_id)
{
u64 curr_time_ns;
u64 elapsed_ns;
int pin_val;
// read current pin state and current time
pin_val = gpiod_get_value(my_enc);
curr_time_ns = ktime_get_ns();
// time since previous accepted pulse
elapsed_ns = curr_time_ns - prev_time_ns;
// debounce: ignore pulses too close to previous valid one
if (pin_val == 1 && elapsed_ns > debounce_ns) {
// accept this pulse: update timestamp, period, and count
prev_time_ns = curr_time_ns;
encoder_period_ns = (unsigned long)elapsed_ns;
encoder_count++;
}
return IRQ_HANDLED;
}
// probe function called when DTS compatible string matches
static int enc_probe(struct platform_device *pdev)
{
struct device *dev = &pdev->dev;
int ret;
printk(KERN_INFO "enc_probe - RUNNING\n");
// get encoder gpio from device tree (DTS property "encoder-gpios")
my_enc = devm_gpiod_get(dev, "encoder", GPIOD_IN);
// error check
if (IS_ERR(my_enc)) {
printk(KERN_ERR "enc_probe - could not get encoder GPIO\n");
return PTR_ERR(my_enc);
}
// map encoder gpio to irq num
irq_number = gpiod_to_irq(my_enc);
// error check
if (irq_number < 0) {
printk(KERN_ERR "enc_probe - could not map GPIO to IRQ\n");
return irq_number;
}
// reset
prev_time_ns = ktime_get_ns();
encoder_period_ns = 0;
encoder_count = 0;
// register isr on rising edge
ret = request_irq(
irq_number,
gpio_irq_handler,
IRQF_TRIGGER_RISING,
"speed_encoder_irq",
pdev
);
// error check
if (ret != 0) {
printk(KERN_ERR "Error! Cannot request interrupt number: %d\n", irq_number);
return ret;
}
printk(KERN_INFO "Speed Encoder Insert Successful. IRQ=%d\n", irq_number);
return 0;
}
// remove function called when module unloads
static void enc_remove(struct platform_device *pdev)
{
// free the irq and print a message
free_irq(irq_number, pdev);
printk(KERN_INFO "Speed Encoder Removed.\n");
}
// match table must match compatible string in encoder.dts
static struct of_device_id matchy_match[] = {
{ .compatible = "custom,speed_encoder" },
{ /* leave alone - keep this here (end node) */ }
};
MODULE_DEVICE_TABLE(of, matchy_match);
// platform driver object
static struct platform_driver speed_encoder = {
.probe = enc_probe,
.remove = enc_remove,
.driver = {
.name = "speed_encoder",
.owner = THIS_MODULE,
.of_match_table = matchy_match,
},
};
module_platform_driver(speed_encoder);
MODULE_DESCRIPTION("ELEC 553 Final Project - Optical Speed Encoder Driver");
MODULE_AUTHOR("Alex Smith");
MODULE_LICENSE("GPL v2");
MODULE_ALIAS("platform:speed_encoder");
/dts-v1/;
/plugin/;
// Device tree overlay for the optical speed encoder
/ {
// Pi 5 SoC compatible string
compatible = "brcm,bcm2712";
fragment@0 {
// root of the device tree
target-path = "/";
__overlay__ {
speed_encoder {
compatible = "custom,speed_encoder";
// Encoder OUT pin connected to GPIO 17 (BCM)
encoder-gpios = <&gpio 17 0>;
};
};
};
};








Comments