Zane ManningAidan ColonSurina KEmmanuella Desruisseaux
Published

Neglect

An autonomous RC car that uses a Raspberry Pi 5 to perform lane-keeping along a track, via steering and speed control.

IntermediateShowcase (no instructions)313
Neglect

Things used in this project

Hardware components

RC Car (w/ DC Motor and Servo Motor)
With DC motor and Servo motor (to move and steer)
×1
Raspberry Pi 5
Raspberry Pi 5
×1
Optical Speed Encoder
×1
Portable Charger
×1

Software apps and online services

OpenCV
OpenCV

Hand tools and fabrication machines

Tape, Duct
Tape, Duct

Story

Read more

Code

lanekeeping.py

Python
Main python script
"""
This file makes use of code from the following sources:

User raja_961, Autonomous Lane-Keeping Car Using Raspberry
Pi and OpenCV. Instructables. URL:
https://www.instructables.com/Autonomous-Lane-Keeping-Car-U
sing-Raspberry-Pi-and/

Big Brains - ELEC 424 Final Project - Team 11. URL:
https://www.hackster.io/big-brains/big-brains-elec-424-final-project-team-11-53c862

"""
import time
import math
import RPi.GPIO as GPIO
import numpy as np
import cv2

import matplotlib.pyplot as plt  # pip install matplotlib


GPIO.setmode(GPIO.BCM)

# ESC parameters
speedPin = 18
speed_pwm_hz = 50
speed_dc = 7.5
GPIO.setup(speedPin, GPIO.OUT)
speed = GPIO.PWM(speedPin, speed_pwm_hz)
speed.start(speed_dc)
# ESC speed control parameters
START_SPEED = 7.72
MAX_SPEED = 7.9
MIN_SPEED = 7.7

# Steering parameters
steeringPin = 19
steering_pwm_hz = 50
steering_dc = 7.5
steering_right_dc = 9
steering_left_dc = 6
# Set up steering GPIO
GPIO.setup(steeringPin, GPIO.OUT)
steering = GPIO.PWM(steeringPin, steering_pwm_hz)
steering.start(steering_dc)
lastSteeringAngle = 90

# Optical encoder parameters
ELAPSED_MS_PATH = "/sys/module/gpiod_driver/parameters/elapsed_ms"

# Video parameters
ESC_KEY_CODE = 27
VIDEO_DEVICE_IDX = 0
VIDEO_WIDTH = 160
VIDEO_HEIGHT = 120

# PD variables
kp = 0.1  #started w 0.01
kd = kp * 0.05
lastTime = 0
lastError = 0

# arrays for making the final graphs
p_vals = []
d_vals = []
err_vals = []
speed_pwm = []
steer_pwm = []

# Steering control ############################################################
def set_steering(p, steering):
    p.ChangeDutyCycle(steering)

# Reset steering to center position
def reset_steering(p):
    p.ChangeDutyCycle(7.5)

# Map a value from one range to another
def map_value(value, in_min, in_max, out_min, out_max):
    # Clamp input to avoid out-of-bound servo values
    value = max(min(value, in_max), in_min)
    return out_min + (float(value - in_min) / (in_max - in_min)) * (out_max - out_min)

###############################################################################

# ESC control #################################################################
def isRedFloorVisible(frame):
    # Convert the frame to HSV color space
    hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
    # Define the lower and upper bounds for red color in HSV
    lower_red1 = np.array([0, 120, 70])
    upper_red1 = np.array([10, 255, 255])
    lower_red2 = np.array([170, 120, 70])
    upper_red2 = np.array([180, 255, 255])
    # Create masks for red color
    mask1 = cv2.inRange(hsv, lower_red1, upper_red1)
    mask2 = cv2.inRange(hsv, lower_red2, upper_red2)
    mask = cv2.bitwise_or(mask1, mask2)
    # Apply morphological operations to clean up the mask
    output = cv2.bitwise_and(frame, frame, mask=mask)
    percentage_red = np.count_nonzero(mask) * 100 / np.size(mask)
    return percentage_red > 5  


###############################################################################

# ESC speed control ###########################################################
# Set the speed of the ESC
def set_esc(p, speed):
    global speed_dc
    speed_dc = speed
    p.ChangeDutyCycle(speed)

# Reset the ESC to a neutral position - stop it
def reset_esc(p):
    p.ChangeDutyCycle(7.5)

# Calibrate the ESC by setting it to full throttle and then neutral
def calibrate_esc(p):
    print('Calibrating ESC...')
    p.ChangeDutyCycle(10)
    time.sleep(2)
    p.ChangeDutyCycle(5)
    time.sleep(2)
    # Reset ESC to neutral position
    reset_esc(p)
    print('ESC calibration complete')

# Speed control functions ##################################################
def accelerate():
    """
    Adjusts the speed parameters to increase velocity, if not at peak capacity.

    Returns:
        None: Provides update on velocity adjustments.
    """
    global speed_dc
    global MAX_SPEED
    
    if speed_dc < MAX_SPEED:
        speed_dc = min(speed_dc + 0.025, MAX_SPEED)  # Enhance speed
        set_esc(speed, speed_dc)
        print(f"Velocity increment: Now at {speed_dc}")
    else:
        print("Peak velocity achieved.")


def decelerate():
    """
    Modulates the speed parameters to reduce velocity, unless fully stopped.

    Returns:
        None: Delivers update on velocity reduction.
    """
    global speed_dc
    global MIN_SPEED
    # Ensure speed does not go below the minimum threshold
    if speed_dc > MIN_SPEED:
        speed_dc = max(speed_dc - 0.025, MIN_SPEED)  # Reduce speed
        set_esc(speed, speed_dc)
        print(f"Velocity decrement: Now at {speed_dc}")
    else:
        print("Complete standstill reached.")

###############################################################################

# Image processing functions ##################################################
def detect_edges(frame):
    # blue color detection
    # blurred = cv2.GaussianBlur(frame, (5,5), 0)
    hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
    # cv2.imshow("HSV",hsv)
    lower_blue = np.array([100,120,50], dtype="uint8")
    upper_blue = np.array([140,255,255], dtype="uint8")
    mask = cv2.inRange(hsv,lower_blue,upper_blue) # this mask will filter out everything but blue
    
    kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (5,5))
    mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel, iterations=2)
    mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN,  kernel, iterations=1)

    #Do GaussianBlur using mask instead of frame
    blurred = cv2.GaussianBlur(mask, (5,5), 0)

    # cv2.imshow("mask",mask)

    # detect edges
    edges = cv2.Canny(mask, 50, 100) 
    # cv2.imshow("edges",edges)
    return edges


def region_of_interest(edges):
    height, width = edges.shape # extract the height and width of the edges frame
    mask = np.zeros_like(edges) # make an empty matrix with same dimensions of the edges frame

    # only focus lower half of the screen
    # specify the coordinates of 4 points (lower left, upper left, upper right, lower right)
    polygon = np.array([[
        (0, height), 
        (0,  height/2),
        (width , height/2),
        (width , height),
    ]], np.int32)

    cv2.fillPoly(mask, polygon, 255) # fill the polygon with blue color 
    cropped_edges = cv2.bitwise_and(edges, mask) 
    # cv2.imshow("roi",cropped_edges)
    return cropped_edges

# Detect line segments using Hough Transform 
def detect_line_segments(cropped_edges):
    rho = 1  
    theta = np.pi / 180  
    min_threshold = 10 
    line_segments = cv2.HoughLinesP(cropped_edges, rho, theta, min_threshold, 
                                    np.array([]), minLineLength=5, maxLineGap=0)
    return line_segments

# Average slope and intercept of the detected line segments
def average_slope_intercept(frame, line_segments):
    lane_lines = []

    if line_segments is None:
        # print("no line segment detected")
        return lane_lines

    # Initialize variables
    height, width,_ = frame.shape
    left_fit = []
    right_fit = []
    boundary = 1/3
    # Define the boundaries for left and right regions
    left_region_boundary = width * (1 - boundary) 
    right_region_boundary = width * boundary 
    
    for line_segment in line_segments:
        for x1, y1, x2, y2 in line_segment:
            if x1 == x2:
                # print("skipping vertical lines (slope = infinity)")
                continue
            # Calculate the slope and intercept of the line segment
            fit = np.polyfit((x1, x2), (y1, y2), 1)
            slope = (y2 - y1) / (x2 - x1)
            intercept = y1 - (slope * x1)
            # check if the slope is within a reasonable range
            if slope < 0:
                if x1 < left_region_boundary and x2 < left_region_boundary:
                    left_fit.append((slope, intercept))
            else:
                if x1 > right_region_boundary and x2 > right_region_boundary:
                    right_fit.append((slope, intercept))
    # find the average slope and intercept for left and right lane lines
    left_fit_average = np.average(left_fit, axis=0)
    if len(left_fit) > 0:
        lane_lines.append(make_points(frame, left_fit_average))
    # if no left lane line detected, use the last known left lane line
    right_fit_average = np.average(right_fit, axis=0)
    if len(right_fit) > 0:
        lane_lines.append(make_points(frame, right_fit_average))

    # lane_lines is a 2-D array consisting the coordinates of the right and left lane lines
    # for example: lane_lines = [[x1,y1,x2,y2],[x1,y1,x2,y2]]
    # where the left array is for left lane and the right array is for right lane 
    # all coordinate points are in pixels
    return lane_lines

# Make points for the lane lines based on the slope and intercept
def make_points(frame, line):
    height, width, _ = frame.shape
    slope, intercept = line
    y1 = height  # bottom of the frame
    y2 = int(y1 / 2)  # make points from middle of the frame down
    # check if the slope is too small, if so, set it to a small value
    if slope == 0: 
        slope = 0.1    
    # Calculate the x-coordinates of the points based on the slope and intercept
    x1 = int((y1 - intercept) / slope)
    x2 = int((y2 - intercept) / slope)

    return [[x1, y1, x2, y2]]

# display lines on the frame
def display_lines(frame, lines, line_color=(0, 255, 0), line_width=6): # line color (B,G,R)
    line_image = np.zeros_like(frame)
    # If no lines are detected, return the original frame
    if lines is not None:
        for line in lines:
            for x1, y1, x2, y2 in line:
                cv2.line(line_image, (x1, y1), (x2, y2), line_color, line_width)
    # Blend the line image with the original frame
    line_image = cv2.addWeighted(frame, 0.8, line_image, 1, 1)  
    return line_image

# Get the steering angle based on the detected lane lines
def get_steering_angle(frame, lane_lines):
    height, width, _ = frame.shape
    # Calculate the steering angle based on the detected lane lines
    if len(lane_lines) == 2:
        _, _, left_x2, _ = lane_lines[0][0]
        _, _, right_x2, _ = lane_lines[1][0]
        mid = int(width / 2)
        x_offset = (left_x2 + right_x2) / 2 - mid
        y_offset = int(height / 2)
    # If only one lane line is detected, use its x-coordinates to calculate the offset
    elif len(lane_lines) == 1:
        x1, _, x2, _ = lane_lines[0][0]
        x_offset = x2 - x1
        y_offset = int(height / 2)
    # If no lane lines are detected, set offsets to zero
    elif len(lane_lines) == 0:
        x_offset = 0
        y_offset = int(height / 2)
    # Calculate the steering angle based on the offsets
    angle_to_mid_radian = math.atan(x_offset / y_offset)
    angle_to_mid_deg = int(angle_to_mid_radian * 180.0 / math.pi)
    steering_angle = angle_to_mid_deg + 90
    
    return steering_angle

# Display a heading line on the frame based on the steering angle
def display_heading_line(frame, steering_angle, line_color=(0, 0, 255), line_width=5):
    heading_image = np.zeros_like(frame)
    height, width, _ = frame.shape
    # Calculate the end points of the heading line based on the steering angle
    steering_angle_radian = steering_angle / 180.0 * math.pi
    x1 = int(width / 2)
    y1 = height
    x2 = int(x1 - height / 2 / math.tan(steering_angle_radian))
    y2 = int(height / 2)
    # Draw the heading line on the heading image
    cv2.line(heading_image, (x1, y1), (x2, y2), line_color, line_width)

    heading_image = cv2.addWeighted(frame, 0.8, heading_image, 1, 1)

    return heading_image

###############################################################################

# Plotting functions ##########################################################
def plot_pd(p_vals, d_vals, error, show_img=False):
    # Plot the PD
    fig, ax1 = plt.subplots()
    t_ax = np.arange(len(p_vals))
    ax1.plot(t_ax, p_vals, '-', label="P values")
    ax1.plot(t_ax, d_vals, '-', label="D values")
    ax2 = ax1.twinx()
    ax2.plot(t_ax, error, '--r', label="Error")
    # set the limits and labels
    ax1.set_xlabel("Frames")
    ax1.set_ylabel("PD Value")
    ax2.set_ylim(-90, 90)
    ax2.set_ylabel("Error Value")
    # set the title and legend
    plt.title("PD Values over time")
    fig.legend()
    fig.tight_layout()
    plt.savefig("pd_plot.png")
    
    if show_img:
        plt.show()
    plt.clf()

# Plot the PWM values for speed and steering
def plot_pwm(speed_pwms, turn_pwms, error, show_img=False):
    # Plot the PWM
    fig, ax1 = plt.subplots()
    t_ax = np.arange(len(speed_pwms))
    ax1.plot(t_ax, speed_pwms, '-', label="Speed PWM")
    ax1.plot(t_ax, turn_pwms, '-', label="Steering PWM")
    ax2 = ax1.twinx()
    ax2.plot(t_ax, error, '--r', label="Error")
    # set the limits and labels
    ax1.set_xlabel("Frames")
    ax1.set_ylabel("PWM Values")
    ax2.set_ylabel("Error Value")
    # set the title and legend
    plt.title("PWM Values over time")
    fig.legend()
    plt.savefig("pwm_plot.png")
    
    if show_img:
        plt.show()
    plt.clf()

###############################################################################

# Set up GPIO
calibrate_esc(speed)

# Set up video stream
print("Initializing video stream...")
video = cv2.VideoCapture(VIDEO_DEVICE_IDX)
video.set(cv2.CAP_PROP_FRAME_WIDTH, VIDEO_WIDTH)
video.set(cv2.CAP_PROP_FRAME_HEIGHT, VIDEO_HEIGHT)
print("Video stream initialized.")
time.sleep(3)   # Allow time for camera window to open

# Start moving slowly
set_esc(speed, START_SPEED)

# TODO: Detect stop signs periodically
frame_counter = 0
# Booleans for handling stop light
passedFirstStopSign = False
secondStopSignTick = 0
# See if we've check the stop sign
stopSignCheck = 10

# The loop
while True:
    # print("speed_dc: ", speed_dc)
    ret, frame = video.read()
    cv2.imshow("original", frame)

    # Image processing for steering
    edges = detect_edges(frame)
    roi = region_of_interest(edges)
    line_segments = detect_line_segments(roi)
    '''
    #5/1/25 Find the center between the lanes instead of tracking one
    if line_segments is not None and len(line_segments) >= 2:
        left_lane = min([line[0] for line in line_segments]) #left lane position
        right_lane = max([line[2] for line in line_segments]) #right lane position
        center_x = (left_lane + right_lane) / 2 #get center between lanes
        steering_angle = get_steering_center(center_x, frame.shape[1]) #use frame width
    else:
        steering_angle = lastSteeringAngle
    '''
    # lastSteeringAngle = steering_angle #save steering angle
    lane_lines = average_slope_intercept(frame,line_segments)
    lane_lines_image = display_lines(frame,lane_lines)
    steering_angle = get_steering_angle(frame, lane_lines)
    heading_image = display_heading_line(lane_lines_image,steering_angle)
    cv2.imshow("heading", heading_image)

    # check for stop signs
    frame_counter += 1
    if frame_counter % stopSignCheck == 0:
        if isRedFloorVisible(frame):
            print("Red floor detected")
            # If we haven't passed the first stop sign, stop
            if not passedFirstStopSign:
                time.sleep(1)
                print("Stopping at 1st red")
                reset_esc(speed)
                time.sleep(4)
                passedFirstStopSign = True

                # Delay subsequent stop sign checks for some number of frames
                secondStopSignTick = frame_counter + 500

                set_esc(speed, START_SPEED)
            # If we have passed the first stop sign, wait for a bit
            elif frame_counter >= secondStopSignTick:
                time.sleep(1)
                print("Stopping at 2nd red")
                reset_esc(speed)
                break

    # PD controller calculations
    now = time.time()
    dt = now - lastTime

    deviation = steering_angle - 90  # deviation from non-turned wheels

    # Error and PD adjustment
    error = -deviation
    base_turn = 0.0
    proportional = kp * error
    derivative = kd * (error - lastError) / dt
    
    # Data for graphing
    p_vals.append(proportional)
    d_vals.append(derivative)
    err_vals.append(error)
    
    # Steering adjustments
    turn_amt = base_turn + proportional + derivative

    # Adjust steering based on calculated turn amount
    turn_amt_mapped = map_value(
        turn_amt,
        in_min=-2.0,                        # raw minimum
        in_max=2.0,                         # raw maximum
        out_min=steering_right_dc,          # servo left
        out_max=steering_left_dc,           # servo right
    )
    set_steering(steering, turn_amt_mapped)
    print(f"PD error: {turn_amt}, steering duty cycle: {turn_amt_mapped}")

    # Speed and steering updates for graphs
    steer_pwm.append(turn_amt_mapped)
    speed_pwm.append(speed_dc)

    # Read encoder data for speed adjustments
    time_diff_ms = 2
    with open(ELAPSED_MS_PATH, "r") as r:
        time_diff_ms = int(r.read())

    if (time_diff_ms != 0):
        print("time_diff: ", time_diff_ms)
    
    # Adjust speed based on encoder feedback
    if time_diff_ms >= 17:
        # Accelerate
        accelerate()
    elif 5 < time_diff_ms < 17:
        # Decelerate
        decelerate()
        
    # Update error values for PD control
    lastError = error
    lastTime = time.time()

    # Wait to exit
    key = cv2.waitKey(1)
    if key == ESC_KEY_CODE:
      break

# Cleanup
video.release()
cv2.destroyAllWindows()
reset_esc(speed)
reset_steering(steering)
GPIO.cleanup()

# Plot that shit
plot_pd(p_vals, d_vals, err_vals, True)
plot_pwm(speed_pwm, steer_pwm, err_vals, True)

gpiod_driver.c

C/C++
Driver code for optical encoder
#include <linux/module.h>
#include <linux/kernel.h>
#include <linux/init.h>
#include <linux/of_device.h>
#include <linux/platform_device.h>
#include <linux/gpio/consumer.h>
#include <linux/interrupt.h>
#include <linux/ktime.h>
#include <linux/fs.h>
#include <linux/device.h>
#include <linux/mod_devicetable.h>
#include <linux/moduleparam.h>

// Define the module parameters
static struct gpio_desc *button_gpio;
static unsigned int irq_number;
static ktime_t last_time;
static volatile s64 last_period_ns;  //nanoseconds
static int elapsed_ms = 0;
module_param(elapsed_ms, int, 0644);

// sysfs attribute to show last measured period
static ssize_t speed_show(struct device *dev, struct device_attribute *attr, char *buf)
{
    // Last year code resets to 0 when idle but idt we have to do that 
    return sprintf(buf, "%lld\n", last_period_ns);
}

static DEVICE_ATTR_RO(speed);

// Interrupt handler
static irqreturn_t button_irq_handler(int irq, void *dev_id)
{
    ktime_t now = ktime_get();
    last_period_ns = ktime_to_ns(ktime_sub(now, last_time));
    elapsed_ms = (int)(last_period_ns / 1000000);
    last_time = now;
    pr_info("gpiod_driver: Detected encoder pulse, period = %lld ns\n", last_period_ns);
    return IRQ_HANDLED;
}

// Probe function
static int encoder_probe(struct platform_device *pdev)
{
    struct device *dev = &pdev->dev;
    int ret;

    dev_info(dev, "gpiod_driver: Probing device...\n");

    // Get GPIO from device tree
    button_gpio = devm_gpiod_get(dev, "button", GPIOD_IN);
    if (IS_ERR(button_gpio)) {
        dev_err(dev, "gpiod_driver: Failed to get button GPIO\n");
        return PTR_ERR(button_gpio);
    }

    // Get IRQ number from GPIO
    irq_number = gpiod_to_irq(button_gpio);
    if (irq_number < 0) {
        dev_err(dev, "gpiod_driver: Failed to get IRQ number\n");
        return irq_number;
    }

    // Request IRQ
    // Use IRQF_TRIGGER_FALLING to detect falling edge of the signal
    ret = devm_request_irq(dev, irq_number, button_irq_handler, IRQF_TRIGGER_FALLING, "encoder_irq", NULL);
    if (ret) {
        dev_err(dev, "gpiod_driver: Failed to request IRQ\n");
        return ret;
    }

    // Initialize last_time to current time
    last_time = ktime_get();

    // Create sysfs entry for speed
    ret = device_create_file(dev, &dev_attr_speed);
    if (ret) {
        dev_err(dev, "gpiod_driver: Failed to create sysfs entry\n");
        return ret;
    }
    // Set initial period to 0
    dev_info(dev, "gpiod_driver: Probe successful\n");
    return 0;
}

// Remove function
static void encoder_remove(struct platform_device *pdev)
{
    struct device *dev = &pdev->dev;

    device_remove_file(dev, &dev_attr_speed);
    free_irq(irq_number, NULL);

    dev_info(dev, "gpiod_driver: Driver removed\n");
}

// Device tree match table
static const struct of_device_id encoder_of_match[] = {
    { .compatible = "fun_overlay" },
    {},
};
MODULE_DEVICE_TABLE(of, encoder_of_match);

// Platform driver structure
static struct platform_driver encoder_driver = {
    .probe = encoder_probe,
    .remove = encoder_remove,
    .driver = {
        .name = "slay_device_driver",
        .of_match_table = encoder_of_match,
        .owner = THIS_MODULE,
    },
};

// Module initialization and cleanup
module_platform_driver(encoder_driver);

MODULE_LICENSE("GPL");
MODULE_AUTHOR("ELEC 424 Team 15");
MODULE_DESCRIPTION("Driver for RC Car");

dt_overlay.dts

C/C++
Overlay file - modified device tree source file
/dts-v1/;
/plugin/;

/ {
    fragment@0 {
        target-path  = "/";
        __overlay__ {
            slay_device { //new name
                compatible = "fun_overlay";
                led-gpios = <&gpio 5 0>;  // GPIO 5
                button-gpios = <&gpio 6 0>;  // GPIO 6 
            };
        };
    };
};

Credits

Zane Manning
1 project • 0 followers
Aidan Colon
1 project • 0 followers
Surina K
1 project • 0 followers
Emmanuella Desruisseaux
1 project • 0 followers
Thanks to Team Big Brains and Raja_961.

Comments