# ==============================================================================
# ELEC 553 Final Project - Main Autonomous Driving Script
#
# This script handles the main autonomous loop for the RPi 5 RC Car.
# It reads camera frames to perform HSV color filtering for lane keeping (blue) and stop box detection (red).
# It utilizes a PD controller for smooth steering, and reads hardware encoder interrupts via a sysfs file to
# dynamically maintain a constant motor speed.
#
# Citation: User raja_961, "Autonomous Lane-Keeping Car Using Raspberry Pi and OpenCV"
# URL: https://www.instructables.com/Autonomous-Lane-Keeping-Car-Using-Raspberry-Pi-and/
# ==============================================================================
import cv2
import numpy as np
import lgpio
import time
import csv
#
# HARDWARE PIN SETUP
#
STEER_PIN = 19
MOTOR_PIN = 18
GPIO_CHIP = 4 # RPi 5 utilizes gpiochip4 for its hardware IO
#
# PWM & ESC CALIBRATION CONSTANTS
#
# Duty cycles: 7.5 = Neutral, >7.5 = Forward/Right, <7.5 = Reverse(motor doesnt seem to go backwards)/Left
STEER_CENTER = 7.5
STEER_MIN = 6.2 # Physical left limit
STEER_MAX = 8.8 # Physical right limit
MOTOR_STOP = 7.5
BASE_SPEED = 8.0 # baseline forward speed
PWM_FREQ = 50 # 50 Hz standard for RC servos and ESCs
#
# PD CONTROLLER GAINS (Steering)
#
# Higher kp -> sharper turns.
Kp = 0.02
# Higher kd => less oscillation/shaking.
Kd = 0.002
#
# STEERING SMOOTHING (Low-Pass Filter)
#
# 0.0 -> Instant response;
# 1.0 -> Completely ignores new calculations
SMOOTH = 0.6
#
# ENCODER & CRUISE CONTROL CONSTANTS
#
# We read the custom kernel driver's output from the sysfs virtual file system
ENCODER_SYSFS = "/sys/module/encoder_driver/parameters/encoder_rate" # note that the vfs gives rates not intervals
TARGET_INTERVAL = 9 # Target milliseconds between encoder ticks
SPEED_STEP = 0.003 # Step size to adjust PWM to maintain target speed
#
# lgpio INITIALIZATION
#
gpio_handle = lgpio.gpiochip_open(GPIO_CHIP)
lgpio.gpio_claim_output(gpio_handle, STEER_PIN)
lgpio.gpio_claim_output(gpio_handle, MOTOR_PIN)
# Helper function to cleanly send PWM signals to a specified pin."""
def set_pwm(pin, duty_cycle):
lgpio.tx_pwm(gpio_handle, pin, PWM_FREQ, duty_cycle)
# Initialize both to neutral
set_pwm(STEER_PIN, STEER_CENTER)
set_pwm(MOTOR_PIN, MOTOR_STOP)
print("Waiting for ESC to initialize...")
time.sleep(2)
#
# CAMERA INITIALIZATION
#
cap = cv2.VideoCapture(0, cv2.CAP_V4L2)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 240)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 180)
if not cap.isOpened():
print("Camera failed to open!")
lgpio.gpiochip_close(gpio_handle)
exit()
#
# DATA LOGGING SETUP
#
LOG_FILE = "/home/pi/run_data.csv"
# Initializes the CSV file with column headers for graphing later.
def init_csv():
with open(LOG_FILE, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(
[
"Frame",
"Error",
"Steer_PWM",
"Motor_PWM",
"Prop_Response",
"Deriv_Response",
"Encoder_Interval_ms",
]
)
# Appends a single frame's calculated states to the CSV file.
def log_row(frame_num, error, steer_pwm, motor_pwm, prop, deriv, enc):
with open(LOG_FILE, "a", newline="") as f:
writer = csv.writer(f)
writer.writerow(
[
frame_num,
round(error, 3) if error is not None else 0,
round(steer_pwm, 4),
round(motor_pwm, 4),
round(prop, 4),
round(deriv, 4),
enc,
]
)
#
# HARDWARE ENCODER READER & SPEED CONTROL
#
# Helper to get the current millisecond interval between encoder ticks from the kernel driver.
def read_encoder():
try:
with open(ENCODER_SYSFS, "r") as f:
return 1000 / int(f.read().strip()) # Convert rate (ticks/sec) to interval (ms)
except:
return 0 # If the driver isn't loaded or throws an error, return 0 to hold speed
# Dynamically adjusts the motor's duty cycle to maintain a constant physical speed regardless of battery drain or track friction.
def adjust_speed(current_dc, interval):
# Ignore noises
if interval == 0 or interval < 1:
return current_dc
# Adjust towards the target interval
# too slow
if interval > TARGET_INTERVAL:
new_dc = current_dc + SPEED_STEP
# too fast
elif interval < TARGET_INTERVAL:
new_dc = current_dc - SPEED_STEP
else:
new_dc = current_dc
# Clamp between 8.0 and 8.2 so the speed is stable
return max(8.0, min(8.2, new_dc))
#
# LANE DETECTION
#
# Processes the camera frame to find the blue tape, calculates the center
# of the lane, and returns the pixel error from the camera's center.
def get_lane_error(frame):
# Convert from BGR to HSV
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
# Define blue range
lower_blue = np.array([80, 30, 30])
upper_blue = np.array([155, 255, 255])
mask = cv2.inRange(hsv, lower_blue, upper_blue)
# Apply morphological opening to remove random background noise
kernel = np.ones((5, 5), np.uint8)
mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel)
img_h, img_w = mask.shape
# We sample the image at 5 specific horizontal rows rather than processing the whole image.
# The lower rows are physically closer to the car, so we give them a higher mathematical weight.
rows = [
int(img_h * 0.55),
int(img_h * 0.65),
int(img_h * 0.75),
int(img_h * 0.85),
int(img_h * 0.92),
]
weights = [0.8, 1.0, 1.2, 1.5, 1.8]
centers = []
# Calculate the center of the blue pixels along each sampled row
for i, r in enumerate(rows):
strip = mask[r : r + 4, :]
_, xs = np.where(strip > 0)
if len(xs) > 10: # Ensure there is enough blue to constitute a valid line
centers.append((np.mean(xs), weights[i]))
blue_pixels = cv2.countNonZero(mask)
if not centers:
return None, blue_pixels
# Calculate the overall weighted average center of the lane
weighted_sum = sum(c * wt for c, wt in centers)
total_weight = sum(wt for _, wt in centers)
lane_center = weighted_sum / total_weight
# Calculate error: Positive error = lane is to the right = car needs to steer right
error = lane_center - (img_w / 2)
# Clamp the error to prevent extreme steering values if the camera glitches
error = max(-90.0, min(90.0, error))
return error, blue_pixels
#
# STOP BOX DETECTION
#
def is_red_stop(frame):
"""Checks the bottom portion of the frame for the red stop box."""
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
# Define red range
mask1 = cv2.inRange(hsv, np.array([9, 36, 149]), np.array([24, 147, 209]))
mask2 = cv2.inRange(hsv, np.array([0, 13, 000]), np.array([11, 250, 255]))
red_mask = mask1 | mask2
img_h, img_w = red_mask.shape
# Define a ROI
roi = red_mask[int(img_h * 0.10) : int(img_h * 0.95), :]
# Calculate the percentage of red pixels in the ROI
ratio = cv2.countNonZero(roi) / (roi.shape[0] * roi.shape[1])
# If larger than threshold, we have a red stop sign
return ratio > 0.15
# ==============================================================================
# DRIVING LOOP (MAIN LOOP)
# ==============================================================================
# Variables for status
last_error = 0.0
last_steer = STEER_CENTER
passed_first_stop = False
last_stop_time = 0
current_motor = BASE_SPEED
frame_num = 0
init_csv()
try:
print("Starting car will move now!")
set_pwm(MOTOR_PIN, BASE_SPEED)
while True:
ret, frame = cap.read()
if not ret:
print("Camera read failed")
break
frame_num += 1
# stop Box Check
if is_red_stop(frame):
# Prevent from immediately stopping again on the same box
if time.time() - last_stop_time > 3:
print(f"[{frame_num}] RED STOP detected")
log_row(frame_num, 0, STEER_CENTER, MOTOR_STOP, 0, 0, 0)
# Brake gradually over a few hundred milliseconds.
for brake_dc in [7.8, 7.6, 7.5]:
set_pwm(MOTOR_PIN, brake_dc)
time.sleep(0.1)
# Stop
set_pwm(MOTOR_PIN, MOTOR_STOP)
set_pwm(STEER_PIN, STEER_CENTER)
# Stop at the first box, terminate at the second box.
if not passed_first_stop:
time.sleep(3)
passed_first_stop = True
last_stop_time = time.time()
set_pwm(MOTOR_PIN, BASE_SPEED)
current_motor = BASE_SPEED
print("Resuming after first stop...")
time.sleep(0.5)
else:
# If at final stop, break the loop
print("Final stop. Done.")
break
# Lane Detection & Steering Calculation
error, blue_px = get_lane_error(frame)
if error is None:
# If the camera completely loses the lane, hold the wheels straight
steer_val = STEER_CENTER
prop = 0.0
deriv = 0.0
else:
# Proportional term
prop = Kp * error
# Derivative term
deriv = Kd * (error - last_error)
# Note: Subtracting because a positive error (lane on right) requires a larger PWM (steer right)
raw_steer = STEER_CENTER - (prop + deriv)
raw_steer = max(STEER_MIN, min(STEER_MAX, raw_steer))
# Apply a low-pass filter to smooth the servo movement
steer_val = SMOOTH * last_steer + (1 - SMOOTH) * raw_steer
# Update state variables
last_steer = steer_val
last_error = error
# Execute the calculated steering angle
set_pwm(STEER_PIN, steer_val)
# Motor Speed Adjustment
enc_interval = read_encoder()
current_motor = adjust_speed(current_motor, enc_interval)
set_pwm(MOTOR_PIN, current_motor)
# Log to file and print data
log_row(frame_num, error, steer_val, current_motor, prop, deriv, enc_interval)
print(
f"[{frame_num}] Err:{error} Steer:{steer_val:.2f} "
f"Motor:{current_motor:.2f} EncRate:{1000 // enc_interval if enc_interval != 0 else None} Blue:{blue_px}"
)
cv2.waitKey(1)
time.sleep(0.02)
except KeyboardInterrupt:
print("\nStopped by user via Keyboard Interrupt")
finally:
# ==============================================================================
# CLEANUP BLOCK
# this block ensures the hardware does not run away uncontrollably
# ==============================================================================
set_pwm(MOTOR_PIN, MOTOR_STOP)
set_pwm(STEER_PIN, STEER_CENTER)
time.sleep(0.5)
# Stop the PWM
lgpio.tx_pwm(gpio_handle, MOTOR_PIN, 0, 0)
lgpio.tx_pwm(gpio_handle, STEER_PIN, 0, 0)
# Release gpio
lgpio.gpiochip_close(gpio_handle)
cap.release()
print(f"Cleanup done. {frame_num} frames logged to CSV.")
Comments