# We took inspiration from the following sources for this code:
# User raja_961, "Autonomous Lane-Keeping Car Using Raspberry Pi and OpenCV". Instructables. URL: https://www.instructables.com/Autonomous-Lane-Keeping-Car-Using-Raspberry-Pi-and/
# Not Elecs: Autonomous Lane Keeping RC Car. URL: https://www.hackster.io/team-8-not-elecs/autonomous-lane-keeping-rc-car-b52ee7
import time
import RPi.GPIO as GPIO
from gpiozero import Servo
import cv2
import numpy as np
import math
import subprocess
# SERVO SETUP
servo = Servo(19)
# ESC SETUP
ESC_PIN = 18
GPIO.setmode(GPIO.BCM)
GPIO.setup(ESC_PIN, GPIO.OUT)
esc_pwm = GPIO.PWM(ESC_PIN, 50)
esc_pwm.start(0)
throttle_on = False
DUTY_NEUTRAL = 7.5
DUTY_MAX_CAL = 10.0
DUTY_MIN_CAL = 5.0
# Speed control
# The car runs at different speeds depending on how straight it's going.
DUTY_STRAIGHT = 7.78 # throttle when error is small (going straight)
DUTY_TURNING = 7.8 # throttle when turning
# Error threshold (degrees) below which we consider the car to be "going straight"
STRAIGHT_THRESHOLD = 10
# THROTTLE CONTROL
def set_throttle(on, error=0):
"""
Drive forward at a speed based on how straight we're going, or return to neutral if on=False.
"""
global throttle_on
if on:
if abs(error) < STRAIGHT_THRESHOLD:
duty = DUTY_STRAIGHT
else:
duty = DUTY_TURNING
esc_pwm.ChangeDutyCycle(duty)
speed_duty_vals.append(duty) # data logging
else:
esc_pwm.ChangeDutyCycle(DUTY_NEUTRAL)
speed_duty_vals.append(DUTY_NEUTRAL) # data logging
throttle_on = on
# STEERING CONTROL
def set_servo_turn_amt(value):
value = max(-1.0, min(1.0, value))
servo.value = value
steering_cmd_vals.append(value) # data logging
# SPEED ENCODER
TARGET_MS = 80
def get_encoder_ms():
try:
result = subprocess.run(
['cat', '/sys/module/driver/parameters/diff'],
capture_output=True, text=True
)
val = int(result.stdout.strip())
return val if val > 0 else None
except Exception:
return None
# LANE DETECTION
def detect_edges(frame):
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
lower_blue = np.array([90, 120, 0], dtype="uint8")
upper_blue = np.array([120, 255, 255], dtype="uint8")
mask = cv2.inRange(hsv, lower_blue, upper_blue)
edges = cv2.Canny(mask, 50, 100)
cv2.imshow("edges", edges)
return edges
def detect_stop_sign(frame):
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
lower_red1 = np.array([0, 100, 100], dtype="uint8")
upper_red1 = np.array([10, 255, 255], dtype="uint8")
lower_red2 = np.array([160, 100, 100], dtype="uint8")
upper_red2 = np.array([180, 255, 255], dtype="uint8")
mask1 = cv2.inRange(hsv, lower_red1, upper_red1)
mask2 = cv2.inRange(hsv, lower_red2, upper_red2)
mask = cv2.bitwise_or(mask1, mask2)
num_red_px = cv2.countNonZero(mask)
print("num_red_px:", num_red_px)
return num_red_px >= 500
def region_of_interest(edges):
height, width = edges.shape
mask = np.zeros_like(edges)
polygon = np.array([[
(0, height),
(0, height // 2),
(width, height // 2),
(width, height),
]], np.int32)
cv2.fillPoly(mask, polygon, 255)
cropped_edges = cv2.bitwise_and(edges, mask)
cv2.imshow("roi", cropped_edges)
return cropped_edges
def detect_line_segments(cropped_edges):
rho = 1
theta = np.pi / 180
min_threshold = 10
line_segments = cv2.HoughLinesP(
cropped_edges, rho, theta, min_threshold,
np.array([]), minLineLength=5, maxLineGap=0
)
return line_segments
def average_slope_intercept(frame, line_segments):
lane_lines = []
if line_segments is None:
return lane_lines
height, width, _ = frame.shape
left_fit = []
right_fit = []
boundary = 1 / 3
left_region_boundary = width * (1 - boundary)
right_region_boundary = width * boundary
for line_segment in line_segments:
for x1, y1, x2, y2 in line_segment:
if x1 == x2:
continue
slope = (y2 - y1) / (x2 - x1)
intercept = y1 - (slope * x1)
if slope < 0:
if x1 < left_region_boundary and x2 < left_region_boundary:
left_fit.append((slope, intercept))
else:
if x1 > right_region_boundary and x2 > right_region_boundary:
right_fit.append((slope, intercept))
if left_fit:
lane_lines.append(make_points(frame, np.average(left_fit, axis=0)))
if right_fit:
lane_lines.append(make_points(frame, np.average(right_fit, axis=0)))
return lane_lines
def make_points(frame, line):
height, width, _ = frame.shape
slope, intercept = line
y1 = height
y2 = int(y1 / 2)
if slope == 0:
slope = 0.1
x1 = int((y1 - intercept) / slope)
x2 = int((y2 - intercept) / slope)
return [[x1, y1, x2, y2]]
def display_lines(frame, lines, line_color=(0, 255, 0), line_width=6):
line_image = np.zeros_like(frame)
if lines is not None:
for line in lines:
for x1, y1, x2, y2 in line:
cv2.line(line_image, (x1, y1), (x2, y2), line_color, line_width)
return cv2.addWeighted(frame, 0.8, line_image, 1, 1)
def get_steering_angle(frame, lane_lines):
height, width, _ = frame.shape
if len(lane_lines) == 2:
_, _, left_x2, _ = lane_lines[0][0]
_, _, right_x2, _ = lane_lines[1][0]
x_offset = (left_x2 + right_x2) / 2 - int(width / 2)
y_offset = int(height / 2)
elif len(lane_lines) == 1:
x1, _, x2, _ = lane_lines[0][0]
x_offset = x2 - x1
y_offset = int(height / 2)
else:
x_offset = 0
y_offset = int(height / 2)
angle_to_mid_radian = math.atan(x_offset / y_offset)
angle_to_mid_deg = int(angle_to_mid_radian * 180.0 / math.pi)
return angle_to_mid_deg + 90
def display_heading_line(frame, steering_angle, line_color=(0, 0, 255), line_width=5):
heading_image = np.zeros_like(frame)
height, width, _ = frame.shape
steering_angle_radian = steering_angle / 180.0 * math.pi
x1 = int(width / 2)
y1 = height
x2 = int(x1 - height / 2 / math.tan(steering_angle_radian))
y2 = int(height / 2)
cv2.line(heading_image, (x1, y1), (x2, y2), line_color, line_width)
return cv2.addWeighted(frame, 0.8, heading_image, 1, 1)
# CAMERA SETUP
def open_camera():
for attempt in range(5):
cap = cv2.VideoCapture(0)
if cap.isOpened():
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 320)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 240)
print(f"Camera opened on attempt {attempt+1}")
return cap
print(f"Camera attempt {attempt+1} failed, retrying...")
time.sleep(1)
return None
video = open_camera()
video.set(cv2.CAP_PROP_FRAME_WIDTH, 320)
video.set(cv2.CAP_PROP_FRAME_HEIGHT, 240)
# PD CONTROLLER GAIN
kp = 0.39
kd = 0.04
STEERING_ALPHA = 0.4
last_turn_amt = None
# STOP SIGN STATE
stops = 0
last_stopped = -1
COOLDOWN = 20
# DATA LOGGING
err_vals = []
p_vals = []
d_vals = []
steering_pwm_data = []
speed_duty_vals = []
steering_cmd_vals = []
# MAIN LOOP SETUP
last_error = 0
frame_count = 0
MAX_FRAMES = 2000
# ESC CALIBRATION + ARM
print("ESC ready. Starting self-driving loop. Press Esc to stop.")
last_cycle = time.time()
try:
while frame_count < MAX_FRAMES:
ret, frame = video.read()
if not ret:
print("Frame read failed, attempting reconnect...")
video.release()
time.sleep(1)
video = open_camera()
if video is None:
print("Camera reconnect failed, exiting.")
break
last_cycle = time.time()
last_error = 0
last_turn_amt = None
continue
frame = cv2.flip(frame, 1)
# Lane detection pipeline
edges = detect_edges(frame)
roi = region_of_interest(edges)
line_segments = detect_line_segments(roi)
lane_lines = average_slope_intercept(frame, line_segments)
lane_lines_image = display_lines(frame, lane_lines)
steering_angle = get_steering_angle(frame, lane_lines)
heading_image = display_heading_line(lane_lines_image, steering_angle)
cv2.imshow("heading", heading_image)
# PD steering controller
now = time.time()
dt = now - last_cycle
dt = max(0.001, min(dt, 0.1))
deviation = steering_angle - 90
error = -deviation
proportional = kp * error
derivative = kd * (error - last_error) / dt
err_vals.append(error)
p_vals.append(proportional)
d_vals.append(derivative)
steering_pwm_data.append(proportional + derivative)
# Throttle speed varies with how straight we're going
# Pass current error so set_throttle can pick the right duty cycle.
if not throttle_on:
set_throttle(True, error)
else:
# Update duty cycle every frame even when already on
set_throttle(True, error)
# Stop sign check (every 3rd frame)
if frame_count % 3 == 0:
if detect_stop_sign(frame):
now_t = time.time()
if last_stopped == -1 or (now_t - last_stopped) > COOLDOWN:
time.sleep(1)
stops += 1
set_throttle(False)
last_stopped = now_t
print(f"STOP #{stops} detected!")
if stops == 1:
print("Waiting 3 seconds then resuming...")
time.sleep(3)
set_throttle(True, 0)
elif stops >= 2:
print("Final stop reached. Exiting.")
break
last_cycle = time.time()
last_error = 0
last_turn_amt = None
continue
# Compute and apply steering
raw_turn = proportional + derivative
if abs(error) < 5:
raw_turn = 0
raw_turn = max(-1.0, min(0.5, raw_turn))
if last_turn_amt is None:
last_turn_amt = raw_turn
turn_amt = STEERING_ALPHA * last_turn_amt + (1 -
STEERING_ALPHA) * raw_turn
last_turn_amt = turn_amt
speed_mode = "STRAIGHT" if abs(
error) < STRAIGHT_THRESHOLD else "TURNING"
print(
f"Frame: {frame_count} | Err: {error:.1f} | Turn: {turn_amt:.3f} | Speed: {speed_mode}"
)
set_servo_turn_amt(turn_amt * 1.28)
last_error = error
last_cycle = now
frame_count += 1
key = cv2.waitKey(1)
if key == 27:
break
finally:
print("Shutting down...")
set_throttle(False)
set_servo_turn_amt(0)
esc_pwm.ChangeDutyCycle(DUTY_NEUTRAL)
esc_pwm.stop()
GPIO.cleanup()
video.release()
cv2.destroyAllWindows()
# Verify all lists are the same length before plotting
# We use the minimum length to avoid "x and y must have same first dimension" errors
min_len = min(len(err_vals), len(steering_cmd_vals), len(speed_duty_vals), len(p_vals))
frames = np.arange(min_len)
d_err = np.array(err_vals[:min_len])
d_steer = np.array(steering_cmd_vals[:min_len])
d_speed = np.array(speed_duty_vals[:min_len])
print(f"Plotting {min_len} data points...")
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
# Plot 1: Error + Steering + Speed
plt.figure(figsize=(10, 6))
plt.plot(frames, d_err, label="Error (deg)")
plt.plot(frames, d_steer, label="Steering Cmd (-1 to 1)")
plt.plot(frames, d_speed, label="Speed Duty Cycle")
plt.xlabel("Frame")
plt.ylabel("Value")
plt.legend()
plt.title("Vehicle Control State")
plt.grid(True)
plt.savefig("plot1.png")
plt.close()
# Plot 2: PD Breakdown
plt.figure(figsize=(10, 6))
plt.plot(frames, d_err, label="Error", alpha=0.5)
plt.plot(frames, p_vals[:min_len], label="Proportional (Kp)")
plt.plot(frames, d_vals[:min_len], label="Derivative (Kd)")
plt.xlabel("Frame")
plt.legend()
plt.title("PID Controller Performance")
plt.grid(True)
plt.savefig("plot2.png")
plt.close()
print("Plots saved successfully as plot1.png and plot2.png")
Comments