Matthew ParkerGabriela FrancoelliecchenMatt Gibson
Published © GPL3+

PiRobot

Self driving vehicle running on a BBAI64 with computer vision!

ExpertShowcase (no instructions)Over 4 days230
PiRobot

Things used in this project

Hardware components

BeagleBone AI-64
BeagleBoard.org BeagleBone AI-64
This can barely run on a laptop. It crashes probably from trying to draw too much current all the time!
×1
Webcam, Logitech® HD Pro
Webcam, Logitech® HD Pro
×1
USB WiFi adapter
×1
Optical speed encoder
×1
Portable charger
×1

Software apps and online services

OpenCV
OpenCV

Story

Read more

Code

LaneKeepingAlgorithm.py

Python
'''
Team: PiRobot
Authors: Matthew Gibson, Matthew Parker, Ellie Chen, Gabriela Franco
Final Project: Autonomous RC Car
Fall 2023

The following code is based on: 
User raja_961, Autonomous Lane-Keeping Car Using Raspberry Pi and OpenCV. Instructables. 
URL: https://www.instructables.com/Autonomous-Lane-Keeping-Car-Using-Raspberry-Pi-and/ 

As well as the following hackster project:
https://www.hackster.io/barely-beagle/barely-beagle-6f4353

Which based its code on this hackster project:
https://www.hackster.io/really-bad-idea/autonomous-path-following-car-6c4992
'''

import cv2
import numpy as np
import matplotlib.pyplot as plt
import math
import sys
import signal
import keyboard
import time
import pwm #custom lib
from pid import Error_PID_Controller

def define_globals():
    # Global variables for pin configurations, loop limits, and various flags
    global throttle_pin 
    throttle_pin = "P9_14" #throttle pin

    # Steering
    global steering_pin
    steering_pin = "P9_16"  #stering pin

    global max_ticks
    max_ticks = 8000    #Max number of loop iterations

    # Booleans for handling stop light
    global passed_stop_light
    passed_stop_light = False   #flag for passing a stop light
    global at_stop_light
    at_stop_light = False   #flag for passing stop light
    global passed_first_stop_sign
    passed_first_stop_sign = False  #flag for passing first stop sign

    global PWM
    PWM = pwm.Autonomous_Car_PWM() # Placeholder for PWM class (assuming it's defined elsewhere)
    global steering_pid
    steering_pid = Error_PID_Controller()  # Placeholder for steering PID controller
    global speed_pid
    speed_pid = Error_PID_Controller()  # Placeholder for speed PID controller

    # Boolean for stopping the car
    global stopped
    stopped = True

    # Counter for loop iterations
    global counter
    counter = 0

    # Arrays for storing PID values and PWM values for plotting graphs
    global p_vals
    p_vals = []
    global d_vals
    d_vals = []
    global err_vals
    err_vals = []
    global speed_pwm
    speed_pwm = []
    global steer_pwm
    steer_pwm = []
    global speed_p_vals
    speed_p_vals = []
    global speed_d_vals
    speed_d_vals = []
    global speed_err_vals
    speed_err_vals = []

    global stopSignCheck
    stopSignCheck = 1
    global sightDebug
    sightDebug = False
    global isStop2SignBool
    isStopSignBool = False

    global avg
    avg = 0
    global results
    results = []


'''Functions that get hsv boundaries and success boundaries;
returns lower color and success boundaries, and upper color and success boundaries
'''
def getRedFloorBoundaries():
    # Function to get HSV boundaries for detecting red floor
    return getBoundaries("redboundaries.txt")


def isRedFloorVisible(frame):
    # Function to detect if red floor is visible in the frame
    boundaries = getRedFloorBoundaries()
    return isMostlyColor(frame, boundaries)


def getTrafficRedLightBoundaries():
    # Function to get HSV boundaries for detecting red stop light
    return getBoundaries("trafficRedBoundaries.txt")


def isTrafficRedLightVisible(frame):
    #Detects whether or not we can see a stop sign in frame
    boundaries = getTrafficRedLightBoundaries()
    #return True if the camera sees a stop light, false otherwise
    return isMostlyColor(frame, boundaries)


def getTrafficGreenLightBoundaries():
    # Function to get HSV boundaries for detecting green light
    return getBoundaries("trafficGreenboundaries.txt")


def isTrafficGreenLightVisible(frame):
    #Detects whether or not we can see a green light in frame
    boundaries = getTrafficGreenLightBoundaries()
    #return True if camera sees green light, false otherwise
    return isMostlyColor(frame, boundaries)


def isMostlyColor(image, boundaries):
    '''
    Detects whether or not the majority of a color on the screen is a particular color
    '''
    #Convert to HSV color space
    hsv_img = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)

    #parse out the color boundaries and the success boundaries
    color_boundaries = boundaries[0]
    percentage = boundaries[1]

    lower = np.array(color_boundaries[0])
    upper = np.array(color_boundaries[1])

    #Create a mask for the color boundaries
    mask = cv2.inRange(hsv_img, lower, upper)
    output = cv2.bitwise_and(hsv_img, hsv_img, mask=mask)

    #Calculate what percentage of image falls between color boundaries
    percentage_detected = np.count_nonzero(mask) * 100 / np.size(mask)

    #Check if the percentage detected is between the success boundaries
    result = percentage[0] < percentage_detected <= percentage[1]

    #return result of percentage between success boundaries and output of color boundary mask
    return result, output


def getBoundaries(filename):
    '''
    Reads boundaries from 'filename' and returns lower and upper color and success boundaries.
    '''
    # Default percentage values for lower and upper boundaries
    default_lower_percent = 50
    default_upper_percent = 100

    # Open the file for reading
    with open(filename, "r") as f:
        # Read lines from the file
        boundary_lines = f.readlines()

        # Split the lower and upper data using comma as a delimiter
        lower_data = [val for val in boundary_lines[0].split(",")]
        upper_data = [val for val in boundary_lines[1].split(",")]

        # Check if lower_data and upper_data have the percentage values
        lower_percent = float(lower_data[3]) if len(lower_data) >= 4 else default_lower_percent
        upper_percent = float(upper_data[3]) if len(upper_data) >= 4 else default_upper_percent

        # Convert data to integer values
        lower_boundaries = [int(x) for x in lower_data[:3]]
        upper_boundaries = [int(x) for x in upper_data[:3]]

        # Form boundaries and percentages lists
        boundaries = [lower_boundaries, upper_boundaries]
        percentages = [lower_percent, upper_percent]

    return boundaries, percentages


#intializes car before running
intialize_car():
    # default value is 7.75% duty at 50Hz to throttle
    PWM.default_vals(throttle_pin)
    # wait for car to be ready
    print("Set throttle to default value, press enter when ESC calibrated", flush = True)
    input()
    #set PWM to default values by passing in steering pin
    PWM.default_vals(steering_pin)
    return

#stops the car
def stop():
    #calls to throttle pin
    PWM.default_vals(throttle_pin)
    #sets global stopped variable to true
    global stopped
    stopped = True

#sends car forward
def go():
    global stopped
    #sets stopped variable to false so car is not stopped
    stopped = False


def detect_edges(frame):
    """
    Detects edges in a frame using Canny edge detection after filtering for blue lane lines.

    :param frame: Input frame (BGR format)
    :return: Edges detected in the frame
    """
    # Convert the frame to HSV color space for better color-based filtering
    hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)

    # Define the lower and upper bounds for blue color in HSV
    lower_blue = np.array([50, 0, 0], dtype="uint8")
    upper_blue = np.array([255, 255, 255], dtype="uint8")

    # Create a mask to filter out only the blue lane lines
    mask = cv2.inRange(hsv, lower_blue, upper_blue)

    # Apply Canny edge detection to the masked image
    edges = cv2.Canny(mask, 100, 200)

    return edges

def region_of_interest(edges):
    """
    Defines a region of interest (ROI) by creating a mask and focusing on the lower half of the screen.

    :param edges: Edges detected in the frame
    :return: Cropped edges within the defined ROI
    """
    height, width = edges.shape

    # Create a mask with zeros
    mask = np.zeros_like(edges)

    # Define a polygon representing the lower half of the screen
    polygon = np.array([[
        (0, height),
        (0, height // 2),
        (width, height // 2),
        (width, height),
    ]], np.int32)

    # Fill the polygon with white color (255)
    cv2.fillPoly(mask, polygon, 255)

    # Apply bitwise AND operation to obtain cropped edges within the defined ROI
    cropped_edges = cv2.bitwise_and(edges, mask)

    return cropped_edges


def detect_line_segments(cropped_edges):
    """
    Detects line segments in the cropped edges using Hough Line Transform.

    :param cropped_edges: Edges within the defined ROI
    :return: Detected line segments
    """
    rho = 1
    theta = np.pi / 180
    min_threshold = 10

    # Use Hough Line Transform to detect line segments
    line_segments = cv2.HoughLinesP(cropped_edges, rho, theta, min_threshold,
                                    np.array([]), minLineLength=5, maxLineGap=150)

    return line_segments

def detect_lane_lines(frame, line_segments):
    """
    Detects and averages lane lines based on line segments.

    :param frame: Input frame
    :param line_segments: Detected line segments
    :return: A list of lane lines represented by their slope and intercept
    """
    lane_lines = []

    # Check if no line segments are detected
    if line_segments is None:
        print("No line segments detected", flush=True)
        return lane_lines

    height, width, _ = frame.shape
    left_lines = []
    right_lines = []

    # Define the left and right region boundaries
    boundary_fraction = 1 / 3
    left_region_boundary = width * (1 - boundary_fraction)
    right_region_boundary = width * boundary_fraction

    # Iterate through each line segment
    for line_segment in line_segments:
        for x1, y1, x2, y2 in line_segment:
            # Skip vertical lines
            if x1 == x2:
                print("Skipping vertical lines (slope = infinity)", flush=True)
                continue

            # Fit a line to the points
            line_params = np.polyfit((x1, x2), (y1, y2), 1)
            slope, intercept = line_params
            line = (slope, intercept)

            # Classify lines into left and right based on slope and region boundaries
            if slope < 0 and x1 < left_region_boundary and x2 < left_region_boundary:
                left_lines.append(line)
            elif slope > 0 and x1 > right_region_boundary and x2 > right_region_boundary:
                right_lines.append(line)

    # Average the slope and intercept for left and right lines
    left_average_line = np.average(left_lines, axis=0) if left_lines else None
    right_average_line = np.average(right_lines, axis=0) if right_lines else None

    # Convert averaged lines to points and add to detect_lane_lines
    if left_average_line:
        detect_lane_lines.append(make_lane_points(frame, left_average_line))
    if right_average_line:
        detect_lane_lines.append(make_lane_points(frame, right_average_line))

    return detect_lane_lines



def make_lane_points(frame, line):
    """
    Generates lane points from a line equation.

    :param frame: Input frame
    :param line: Line represented by slope and intercept
    :return: List of points representing the lane line
    """
    height, width, _ = frame.shape

    slope, intercept = line

    y1 = height  # Bottom of the frame
    y2 = int(y1 / 2)  # Points from the middle of the frame down

    # Ensure the slope is not zero to avoid division by zero
    if slope == 0:
        slope = 0.1

    x1 = int((y1 - intercept) / slope)
    x2 = int((y2 - intercept) / slope)

    return [[x1, y1, x2, y2]]


def draw_lane_lines(frame, lines, line_color=(0, 255, 0), line_width=6):
    """
    Draws lane lines on a frame.

    :param frame: Input frame
    :param lines: List of lines represented by points
    :param line_color: Color of the drawn lines (BGR format)
    :param line_width: Width of the drawn lines
    :return: Image with the lane lines drawn
    """
    lane_image = np.zeros_like(frame)

    # Check if lines are not None
    if lines is not None:
        # Draw each line on the lane_image
        for line in lines:
            for x1, y1, x2, y2 in line:
                cv2.line(lane_image, (x1, y1), (x2, y2), line_color, line_width)

    # Combine the lane_image with the original frame
    lane_image = cv2.addWeighted(frame, 0.8, lane_image, 1, 1)

    return lane_image


def display_heading_line(frame, steering_angle, line_color=(0, 0, 255), line_width=5):
    """
    Displays a heading line on the frame based on the steering angle.

    :param frame: Input frame
    :param steering_angle: Steering angle in degrees
    :param line_color: Color of the heading line (BGR format)
    :param line_width: Width of the heading line
    :return: Image with the heading line drawn
    """
    heading_image = np.zeros_like(frame)
    height, width, _ = frame.shape

    # Convert steering angle to radians
    steering_angle_radian = steering_angle / 180.0 * math.pi

    x1 = int(width / 2)
    y1 = height
    x2 = int(x1 - height / 2 / math.tan(steering_angle_radian))
    y2 = int(height / 2)

    # Draw the heading line on the heading_image
    cv2.line(heading_image, (x1, y1), (x2, y2), line_color, line_width)
    
    # Combine the heading_image with the original frame
    heading_image = cv2.addWeighted(frame, 0.8, heading_image, 1, 1)

    return heading_image


def calculate_steering_angle(frame, detect_lane_lines):
    """
    Calculates the steering angle based on detected lane lines.

    :param frame: Input frame
    :param detect_lane_lines: List of detected lane lines
    :return: Calculated steering angle in degrees
    """
    height, width, _ = frame.shape

    if len(detect_lane_lines) == 2:
        _, _, left_x2, _ = detect_lane_lines[0][0]
        _, _, right_x2, _ = detect_lane_lines[1][0]
        mid = int(width / 2)
        x_offset = (left_x2 + right_x2) / 2 - mid
        y_offset = int(height / 2)

    elif len(detect_lane_lines) == 1:
        x1, _, x2, _ = detect_lane_lines[0][0]
        x_offset = x2 - x1
        y_offset = int(height / 2)

    elif len(detect_lane_lines) == 0:
        x_offset = 0
        y_offset = int(height / 2)

    # Calculate angle to the middle of the frame
    angle_to_mid_radian = math.atan(x_offset / y_offset)
    angle_to_mid_deg = int(angle_to_mid_radian * 180.0 / math.pi)
    
    # Adjust and return the steering angle
    steering_angle = angle_to_mid_deg + 90
    return steering_angle


import matplotlib.pyplot as plt
import numpy as np

def plot_pd(p_vals, d_vals, error, show_img=False):
    """
    Plots P, D, and Error values over time.

    :param p_vals: List of P values
    :param d_vals: List of D values
    :param error: List of Error values
    :param show_img: Boolean flag to display the image (default is False)
    """
    # Create a subplot with shared x-axis
    fig, ax1 = plt.subplots()
    t_ax = np.arange(len(p_vals) - 10)

    # Plot P and D values on the primary y-axis
    ax1.plot(t_ax, p_vals[10:], '-', label="P values")
    ax1.plot(t_ax, d_vals[10:], '-', label="D values")

    # Create a secondary y-axis for Error values
    ax2 = ax1.twinx()
    ax2.plot(t_ax, error[10:], '--r', label="Error")

    # Set labels and limits
    ax1.set_xlabel("Frames")
    ax1.set_ylabel("PD Value")
    ax2.set_ylim(-90, 90)
    ax2.set_ylabel("Error Value")

    # Add title and legend
    plt.title("PD Values over time")
    fig.legend()
    fig.tight_layout()

    # Save or display the plot
    if show_img:
        plt.show()
    else:
        plt.savefig("pd_plot.png")

    # Clear the figure to avoid overlapping plots
    plt.clf()

def plot_speed_pd(p_vals, d_vals, error, show_img=False):
    """
    Plots P, D, Sum PD, and Error values over time.

    :param p_vals: List of P values
    :param d_vals: List of D values
    :param error: List of Error values
    :param show_img: Boolean flag to display the image (default is False)
    """
    # Create a subplot with shared x-axis
    fig, ax1 = plt.subplots()
    t_ax = np.arange(len(p_vals) - 10)

    # Plot P and D values on the primary y-axis
    ax1.plot(t_ax, p_vals[10:], '-', label="P values")
    ax1.plot(t_ax, d_vals[10:], '-', label="D values")

    # Calculate and plot the sum of P and D values (Sum PD)
    sum_vals = [sum(value) for value in zip(p_vals, d_vals)]
    ax1.plot(t_ax, sum_vals[10:], '-', label="Sum PD values")

    # Create a secondary y-axis for Error values
    ax2 = ax1.twinx()
    ax2.plot(t_ax, error[10:], '--r', label="Error")

    # Set labels
    ax1.set_xlabel("Frames")
    ax1.set_ylabel("PD Value")
    ax2.set_ylabel("Error Value")

    # Add title and legend
    plt.title("PD Values over time")
    fig.legend()
    fig.tight_layout()

    # Save or display the plot
    if show_img:
        plt.show()
    else:
        plt.savefig("speed_pd_plot.png")

    # Clear the figure to avoid overlapping plots
    plt.clf()


def plot_pwm(speed_pwms, turn_pwms, error, show_img=False):
    """
    Plots Speed PWM, Steering PWM, and Error values over time.

    :param speed_pwms: List of Speed PWM values
    :param turn_pwms: List of Steering PWM values
    :param error: List of Error values
    :param show_img: Boolean flag to display the image (default is False)
    """
    # Create a subplot with shared x-axis
    fig, ax1 = plt.subplots()
    t_ax = np.arange(len(speed_pwms) - 10)

    # Plot Speed PWM and Steering PWM values on the primary y-axis
    ax1.plot(t_ax, speed_pwms[10:], '-', label="Speed PWM")
    ax1.plot(t_ax, turn_pwms[10:], '-', label="Steering PWM")

    # Create a secondary y-axis for Error values
    ax2 = ax1.twinx()
    ax2.plot(t_ax, error[10:], '--r', label="Error")

    # Set labels
    ax1.set_xlabel("Frames")
    ax1.set_ylabel("PWM Values")
    ax2.set_ylabel("Error Value")

    # Add title and legend
    plt.title("PWM Values over time")
    fig.legend()

    # Save or display the plot
    if show_img:
        plt.show()
    else:
        plt.savefig("pwm_plot.png")

    # Clear the figure to avoid overlapping plots
    plt.clf()

def get_encoder_time():
    """
    Reads the encoder value from the driver.

    :return: Encoder value as an integer
    """
    # Open the encoder driver
    driver_file = open("/dev/encoder_driver")

    # Read from the driver
    line = driver_file.readline()

    # Convert to an integer
    line = line.strip()
    value = int(line)

    # Close the driver
    driver_file.close()

    # Return the encoder value
    return value


def update_throttle():
    """
    Updates the throttle value based on the Speed PID controller.

    Global Variables:
    - speed_pid: Speed PID controller instance
    - throttle_pin: Pin for throttle control
    - stopped: Flag indicating whether the vehicle is stopped
    - speed_pwm: List to store Speed PWM values
    - speed_p_vals: List to store Speed P values
    - speed_d_vals: List to store Speed D values
    - speed_err_vals: List to store Speed error values
    - PWM: PWM controller instance
    - counter: Counter for controlling initial cycles

    :return: None
    """
    # TODO this
    global speed_pid
    global throttle_pin
    global stopped
    global speed_pwm
    global speed_p_vals
    global speed_d_vals
    global speed_err_vals
    global PWM
    global counter

    # Get value from PID
    pid_val = speed_pid.get_output_val()
    new_cycle = 8.0 + pid_val

    # Ensure new_cycle is within valid range
    if new_cycle > 8.9:
        new_cycle = 8.4  # Full speed
        print("Speed PID value too large", flush=True)
    elif new_cycle < 8.0:
        new_cycle = 8.0  # Stopped
        print("Speed PID value underflow", flush=True)

    # Adjust PWM values based on vehicle state
    if stopped:
        PWM.default_vals(throttle_pin)  #if stopped
        speed_pwm.append(8.0)
    else:
        if counter < 10:
            PWM.set_duty_cycle(throttle_pin, 8.2)   #if counter is less than 10
        else:
            PWM.set_duty_cycle(throttle_pin, 8.2)   #if counter is greater than 10
        speed_pwm.append(new_cycle)

    # Update lists with PID values
    speed_p_vals.append(speed_pid.get_p_gain() * speed_pid.get_error())
    speed_d_vals.append(speed_pid.get_d_gain() * speed_pid.get_error_derivative())
    speed_err_vals.append(speed_pid.get_error())

    return


def update_steering():
    """
    Update the steering based on PID controller output.

    :return: None
    """
    global steering_pid
    global steering_pin
    global p_vals
    global d_vals
    global err_vals
    global steer_pwm
    global PWM

    # Get PID controller output
    pid_val = steering_pid.get_output_val()

    # Adjust the new PWM cycle for steering
    new_cycle = 7.5 + pid_val

    # Clip the values to stay within valid steering range
    if new_cycle > 9:
        new_cycle = 9
        print("Steering PID Output exceeds max steering val", flush=True)
    elif new_cycle < 6:
        new_cycle = 6
        print("Steering PID Output is lower than minimum steering value", flush=True)

    # Set the duty cycle for the steering servo
    PWM.set_duty_cycle(steering_pin, new_cycle)

    # Record PID components and PWM value
    p_vals.append(steering_pid.get_p_gain() * steering_pid.get_error())
    d_vals.append(steering_pid.get_d_gain() * steering_pid.get_error_derivative())
    err_vals.append(steering_pid.get_error())
    steer_pwm.append(new_cycle)

    return


def init_video():
    """
    Initialize the video capture.

    :return: None
    """
    global video

    # Set up video capture
    video = cv2.VideoCapture(2)
    video.set(cv2.CAP_PROP_FRAME_WIDTH, 160)    #set frame width
    video.set(cv2.CAP_PROP_FRAME_HEIGHT, 120)   #set frame height

    # Wait for video to load
    time.sleep(1)



def init_pids():
    """
    Initialize PID controllers for steering and speed.

    :return: None
    """
    global steering_pid
    global speed_pid

    # Set PID gains for steering controller
    steering_pid.set_p_gain(0.036)
    steering_pid.set_i_gain(0)
    steering_pid.set_d_gain(0.01)

    # Set PID gains for speed controller
    speed_pid.set_p_gain(0.0052)  # was 0.006
    speed_pid.set_i_gain(0.0)
    speed_pid.set_d_gain(-0.006)

    # Set the target angle for steering PID (degrees from straight)
    steering_pid.set_target(0)

    # Set the target speed for speed PID (convert times from ns to us)
    speed_pid.set_target(200)  # set to 80 ms

    # tune PIDs here

def main_init():
    """
    Perform main initialization tasks.

    :return: None
    """
    define_globals() #global
    initialize_car() #car
    init_video() #video
    init_pids() #pids

def main_loop():
    """
    Main control loop for the autonomous car.

    :return: None
    """
    go()
    #initialize global variables 
    global p_vals
    global d_vals
    global err_vals
    global speed_pwm
    global steer_pwm
    global counter
    global stopSignCheck
    global passed_first_stop_sign
    global secondStopSignTick
    global steering_pid
    global speed_pid
    global avg
    global results
    global stopClicks

    stopClicks = -1

    #while counter is less than max ticks
    while counter < max_ticks:
        # print counter value to console
        print(counter, flush=True)
        # print("Reading Video")
        # read frame
        ret, original_frame = video.read()
        # print("read video", flush=True)
        # copy/resize frame
        frame = cv2.resize(original_frame, (160, 120))
        # print("resized video", flush=True)
        if sightDebug:
            cv2.imshow("Resized Frame", frame)

        # check for stop sign/traffic light every couple ticks
        if ((counter + 1) % stopSignCheck) == 0:

            # check for the first stop sign
            if not passed_first_stop_sign:
                isStopSignBool, floorSight = isRedFloorVisible(frame)
                cv2.imshow("floorSight", floorSight)    #show floor
                if isStopSignBool and stopClicks == -1:
                    stopClicks = counter + 15   #update stopClicks
                if stopClicks == counter:
                    print("Detected first stop sign, stopping", flush=True)
                    stop()
                    time.sleep(2)   #pause after seeing first stop sign
                    go()
                    passed_first_stop_sign = True
                    # this is used to not check for the second stop sign until many frames later
                    secondStopSignTick = counter + 100
                    # now check for stop sign less frequently
                    stopSignCheck = 3
                    print("first stop finished!", flush=True)
                    stopClicks = -1
            # check for the second stop sign
            elif passed_first_stop_sign and counter > secondStopSignTick:
                isStop2SignBool, floorSight = isRedFloorVisible(frame)
                cv2.imshow("floorSight", floorSight)
                if isStop2SignBool and stopClicks == -1:
                    stopClicks = (counter + 15) #update stopClicks
                    print("Detected Second Stop Sign!")
                if counter == stopClicks:
                    # last stop sign detected, exits while loop
                    print("detected second stop sign, stopping", flush=True)
                    stop()
                    break

        # Process the frame to determine the desired steering angle

        # Detect edges in the frame
        edges = detect_edges(frame)
        # Define a region of interest (ROI) based on the detected edges
        roi = region_of_interest(edges)
        # Detect line segments within the ROI
        line_segments = detect_line_segments(roi)
        # Calculate the average slope and intercept of the detected line segments to represent lane lines
        detect_lane_lines = average_slope_intercept(frame, line_segments)
        # Draw the detected lane lines on the original frame
        lane_lines_image = draw_lane_lines(frame, detect_lane_lines)
        # Calculate the steering angle based on the detected lane lines
        steering_angle = calculate_steering_angle(frame, detect_lane_lines)
        # Display a visual representation of the heading line on the frame
        heading_image = display_heading_line(detect_lane_lines_image, steering_angle)
        # Show the frame with the heading line for visualization
        cv2.imshow("heading line", heading_image)

        # calculate changes for PID
        if sightDebug:
            cv2.imshow("Cropped sight", roi)
        deviation = steering_angle - 90     #positive dev means turn right,negative dev means turn left

        # PID Code for steering angle control
        # Update the PID controller with the deviation (difference between desired and current steering angles)
        steering_pid.update_pid(deviation)

        # Get the current time from the encoder (measured in nanoseconds)
        encoder_time = get_encoder_time()

        # Convert time from nanoseconds to microseconds
        encoder_time = encoder_time / 1000.0

        # Calculate the average encoder time over a specific range of frames
        if counter > 6 and counter <= 10:   #if counter is greater than 6 or less than or equal to 10
            results.append(encoder_time)
            avg = sum(results) / len(results)
        elif counter > 10:  #if counter is greater than 10
            results = results[1:]
            results.append(encoder_time)
            avg = sum(results) / len(results)
        else:
            avg = 10000000

        # Convert the average encoder time to an average speed in microseconds per frame
        temp_avg = 1000000.0 / avg

        # Print the average speed for the current loop
        print("Average speed for loop: ", temp_avg)

        # Update the PID controller for speed control using the calculated average speed
        speed_pid.update_pid(temp_avg)

        # Update the throttle based on the speed PID controller output
        update_throttle()

        # Update the steering based on the steering PID controller output
        update_steering()

        # Wait for a short time (10 milliseconds) to control the loop frequency
        cv2.waitKey(10)

        # Increment the counter for the loop
        counter += 1

def cleanup():
    """
    Clean up resources and perform necessary actions before exiting the program.
    """
    # Release the video capture object
    video.release()

    # Close all OpenCV windows
    cv2.destroyAllWindows()

    # Set default PWM values for both throttle and steering
    PWM.default_vals(throttle_pin)
    PWM.default_vals(steering_pin)

    # Plot and display PID-related graphs before exiting
    plot_pd(p_vals, d_vals, err_vals, True)
    plot_speed_pd(speed_p_vals, speed_d_vals, speed_err_vals, True)
    plot_pwm(speed_pwm, steer_pwm, err_vals, True)


# Main Script
# Initialize the car, video capture, and PID controllers
main_init()

try:
    # Start the main loop for processing frames and controlling the car
    main_loop()

except KeyboardInterrupt:
    # Handle KeyboardInterrupt to exit the program gracefully
    print("Received command to exit early!", flush=True)

finally:
    # Clean up resources and perform necessary actions before exiting
    cleanup()

pwm.py

Python
'''
Team: PiRobot
Authors: Matthew Gibson, Matthew Parker, Ellie Chen, Gabriela Franco
Final Project: Autonomous RC Car
Fall 2023

The following code is based on: https://www.hackster.io/barely-beagle/barely-beagle-6f4353
'''

class Autonomous_Car_PWM():

    def set_duty_cycle(self, pin, cycle):
        """
        Sets the duty cycle for specified pin.

        Pin should be either "P9_14 or P9_16".
        Cycle should be a numeric type between 5 and 10.

        Returns true upon success or false on failure.

        Based on: https://www.hackster.io/barely-beagle/barely-beagle-6f4353
        """

        #if cycle is between 5-10, calculate period and write to beaglebone
        if 5 <= cycle <= 10
            #calculate the period (T) based on given duty cycle
            T = int((cycle / 100.0) * 20000000)
            T = int(T - (T % 1000))

            #write calculated period to the appropriate PWM file
            if pin == "P9_14":
                with open('/dev/bone/pwm/1/a/duty_cycle', 'w') as filetowrite:
                    filetowrite.write(T.__str__())
                return True
            elif pin == "P9_16":
                with open('/dev/bone/pwm/1/b/duty_cycle', 'w') as filetowrite:
                    filetowrite.write(T.__str__())
                return True
            else:
                return False

        #if cycle is less than 5 or greater than 10, return False
        else:   
            return False 
        


    def default_vals(self, pin):
        """
        Resets specified pin's PWM to default values (stopped or centered).
        Pin must be either "P9_14" or "P9_16".
        Returns true upon success or false on failure.

        """
        #if pin is P9_14, write duty cycle to beaglebone
        if pin == "P9_14": # motor
            with open('/dev/bone/pwm/1/a/duty_cycle', 'w') as filetowrite:
                filetowrite.write("1550000")
            return True     #returns true if duty cycle for motor writes

        elif pin == "P9_16": # steering
            with open('/dev/bone/pwm/1/b/duty_cycle', 'w') as filetowrite:
                filetowrite.write("1500000")
            return True     #returns true if duty cycle for steering writes
        else:
            return False    #returns False if pin isn't P9_14 or P9_16

pid.py

Python
'''
Team: PiRobot
Authors: Matthew Gibson, Matthew Parker, Ellie Chen, Gabriela Franco
Final Project: Autonomous RC Car
Fall 2023
'''
class Error_PID_Controller():
    """
    This class implements a PID Controller that operates on the error between a desired value and the current value.
    """

    # Desired value that the controller is trying to achieve
    desired = 0.0
    
    # Store PID gains
    p_gain = 0.0
    i_gain = 0.0
    d_gain = 0.0

    # Variable to calculate derivative
    last_error = 0.0
    
    # Current sample or measurement from the system
    current_sample = 0.0
    
    # Accumulated integral of the error
    integral = 0.0
    
    # Calculated derivative of the error
    derivative = 0.0

    # Output value calculated by the PID controller
    output = 0.0


class Error_PID_Controller():
    """
    This class implements a PID Controller that operates on the error between a desired value and the current value.
    """

    # (Previous comments...)

    def set_target(self, target):
        """
        Set a new target value and reset the integral counter.

        :param target: The new target value for the controller.
        :return: True if the operation is successful.
        """
        self.desired = target
        self.integral = 0.0
        return True

    def set_p_gain(self, gain):
        """
        Set the proportional gain.

        :param gain: The new proportional gain value.
        :return: True if the operation is successful.
        """
        self.p_gain = gain
        return True

    def get_p_gain(self):
        """
        Get the current proportional gain.

        :return: The current proportional gain value.
        """
        return self.p_gain


    def set_i_gain(self, gain):
        """
        This method sets the integral gain
        """
        self.i_gain = gain
        return True

    def get_i_gain(self):
        """
        This method gets the integral gain
        """
        return self.i_gain


    def set_d_gain(self, gain):
        """
        This method sets the derivative gain
        """
        self.d_gain = gain
        return True

    def get_d_gain(self):
        """
        Get the current derivative gain.

        :return: The current derivative gain value.
        """
        return self.d_gain

    def get_error(self):
        """
        Get the last error value.

        :return: The last error value.
        """
        return self.last_error

    def get_error_derivative(self):
        """
        Get the derivative of the error.

        :return: The derivative of the error value.
        """
        return self.derivative

    def get_error_integral(self):
        """
        Get the integral of the error.

        :return: The integral of the error value.
        """
        return self.integral

    def get_output_val(self):
        """
        Get the current PID output value.

        :return: The current PID output value.
        """
        return self.output


    def update_pid(self, new_val):
        """
        Update the PID controller with a new sample and compute new values.

        :param new_val: The new sample value to be used for the update.
        :return: The new controller output value, which can also be retrieved from the get_output_val method.
        """
        # Calculate the error between the desired value and the new sample
        error = self.desired - new_val

        # Update the integral term with the current error
        self.integral = self.integral + error

        # Compute the derivative of the error
        self.derivative = error - self.last_error

        # Update the last error with the current error
        self.last_error = error

        # Calculate the overall output using the PID formula
        sum = 0
        sum = sum + self.integral * self.i_gain
        sum = sum + self.derivative * self.d_gain
        sum = sum + error * self.p_gain
        self.output = sum

        return self.output


    

plot_results.py

Python
'''
Team: PiRobot
Authors: Matthew Gibson, Matthew Parker, Ellie Chen, Gabriela Franco
Final Project: Autonomous RC Car
Fall 2023
'''

import matplotlib.pyplot as plt
import numpy as np
import time

def get_encoder_time():
    """
    Get the encoder time value by reading from the driver.

    :return: The encoder time value as an integer.
    """
    # Open the driver
    driver_file = open("/dev/encoder_driver")
    
    # Read from the driver
    line = driver_file.readline()
    
    # Convert to an integer
    line = line.strip()
    value = int(line)
    
    # Close the driver
    driver_file.close()
    
    # Return the value
    return value

#initialize var and arrays with num samples, window length, results, and avg
num_samples = 500	#num samples = 500
window_len = 10	#window len = 10
results = []	#empty results array
avg = []	#empty avg array

for i in range(num_samples):
    # Get the new encoder time value
    new_time = 1000000.0 / get_encoder_time()
    results.append(new_time)

    if i == 0:
        # Initialize the window with the first value
        window = np.full(window_len, new_time)
    else:
        # Update the window with the new value
        window[i % window_len] = new_time
    
    # Calculate the moving average
    avg.append(np.average(window))

    # Sleep for a short duration
    time.sleep(0.01)

# Plot the results using Matplotlib
fig, ax1 = plt.subplots()

# Generate time axis based on the number of samples
t_ax = np.arange(len(results))

# Plot the raw samples on the primary y-axis
ax1.plot(t_ax, results, '-', label="Samples")

# Create a secondary y-axis for the moving average
ax2 = ax1.twinx()
ax2.plot(t_ax, avg, '--r', label="Avg")

# Set labels for the axes
ax1.set_xlabel("loop")
ax1.set_ylabel("Sample val")
ax2.set_ylabel("Avg val")

# Set the title of the plot
plt.title("Samples over loops")

# Display the legend
fig.legend()

# Adjust layout to prevent clipping of labels
fig.tight_layout()

# Save the plot as an image file
plt.savefig("plot_plot.png")

# Clear the figure to prepare for the next plot
plt.clf()

pwm_init.py

Python
'''
Team: PiRobot
Authors: Matthew Gibson, Matthew Parker, Ellie Chen, Gabriela Franco
Final Project: Autonomous RC Car
Fall 2023

 This file initializes the Beaglebone PWM.
'''

# P9_14 - Speed/ESC: initializes the main motor ESC to 7.75% duty cycle
with open('/dev/bone/pwm/1/a/period', 'w') as filetowrite:  # Open the PWM period file and set the period to 20,000,000 nanoseconds (50 Hz frequency)
    filetowrite.write('20000000')
with open('/dev/bone/pwm/1/a/duty_cycle', 'w') as filetowrite:  # Open the PWM duty_cycle file and set the duty cycle to 1,550,000 nanoseconds (7.75% duty cycle)
    filetowrite.write('1550000')
with open('/dev/bone/pwm/1/a/enable', 'w') as filetowrite:  # Open the PWM enable file and set it to '1' to enable the PWM signal
    filetowrite.write('1')

# P9_16 - Steering: initializes the Steering servo to the center (7.5% duty cycle)
with open('/dev/bone/pwm/1/b/period', 'w') as filetowrite:  # Open the PWM period file and set the period to 20,000,000 nanoseconds (50 Hz frequency)
    filetowrite.write('20000000')
with open('/dev/bone/pwm/1/b/duty_cycle', 'w') as filetowrite:  # Open the PWM duty_cycle file and set the duty cycle to 1,500,000 nanoseconds (center position)
    filetowrite.write('1500000')
with open('/dev/bone/pwm/1/b/enable', 'w') as filetowrite:  # Open the PWM enable file and set it to '1' to enable the PWM signal
    filetowrite.write('1')

encoder_driver.c

C/C++
#include <linux/module.h>
#include <linux/init.h>
#include <linux/fs.h>
#include <linux/device.h>
#include <linux/uaccess.h>
#include <linux/mutex.h>
#include <linux/of_device.h>
#include <linux/kernel.h>
#include <linux/gpio/consumer.h>
#include <linux/platform_device.h>
#include <linux/irq.h>
#include <linux/types.h>
#include <linux/interrupt.h>
#include <linux/ktime.h>
#include <linux/timekeeping.h>

/* Code written referencing course materials and https://github.com/Johannes4Linux/Linux_Driver_Tutorial/blob/main/11_gpio_irq/gpio_irq.c */

#define DEVICE_NAME "encoder_driver"
#define CLASS_NAME "elec424"

struct device *dev;
struct gpio_desc *button_desc;
int irq;

static volatile u64 last_time;
static volatile u64 curr_time;

static int major_number;
static struct class *encoder_driver_class = NULL;
static struct device *encoder_driver_device = NULL;
static int times_called = 0;
static char message[256] = {0};
static int last_diff = 0;
static short size_of_message;

static DEFINE_MUTEX(encoder_mutex);
static DEFINE_MUTEX(interrupt_mutex);

static int already_read;

/**
 * File operations declaration
 */
static int device_open(struct inode *, struct file *);
static ssize_t device_read(struct file *, char __user *, size_t, loff_t *);
static int device_release(struct inode *, struct file *);
/**
 * ISR Declaration
 */
static irq_handler_t encoder_irq_handler(unsigned int irq, void *dev_id, struct pt_regs *regs);

static struct file_operations fops = 
{
    .open = device_open,
    .read = device_read,
    .release = device_release,
};

static int __init hello_init(void){
    major_number = register_chrdev(0, DEVICE_NAME, &fops);
    encoder_driver_class = class_create(THIS_MODULE, CLASS_NAME);
    encoder_driver_device = device_create(encoder_driver_class, NULL, MKDEV(major_number, 0), NULL, DEVICE_NAME);
    printk("init function has been called!\n");
    mutex_init(&encoder_mutex);
    return 0;
}

static void __exit hello_exit(void){
    device_destroy(encoder_driver_class, MKDEV(major_number,0));
    class_unregister(encoder_driver_class);
    class_destroy(encoder_driver_class);
    unregister_chrdev(major_number, DEVICE_NAME);
    mutex_destroy(&encoder_mutex);
    printk("exit function has been called!\n");
}

static int device_open(struct inode *inodep, struct file *filep){
	if(!mutex_trylock(&encoder_mutex)){
		printk(KERN_ALERT "I'm being used!\n");
		return -EBUSY;
	}
    times_called++;
    printk("encoder character device opened!");
    return 0;
}

static ssize_t device_read(struct file *filep, char __user *buf, size_t length, loff_t *offset){
	long error_count;
	u64 call_time;
	u64 call_diff;
	if(already_read == 1)
	{
		return 0; //send EOF
	}
	already_read = 1;
	call_time = ktime_get_ns();
	call_diff = call_time - last_time;
	printk("User queried driver, call_diff = %lu, last_diff = %lu\n", (unsigned long) call_diff, (unsigned long) last_diff);
	while(mutex_is_locked(&interrupt_mutex));
	mutex_lock(&interrupt_mutex);
	if (call_diff > 3*last_diff)
	{
		printk("Updating message because of call_diff\n!");
		last_diff = call_diff;
		sprintf(message, "%lu\n", (unsigned long) call_diff);
		size_of_message = strlen(message);
	}
	error_count = copy_to_user(buf, message, size_of_message);
	mutex_unlock(&interrupt_mutex);
	printk("Sent %d characters to user!\n", size_of_message);
	printk("User should have received message: %s\n", message);
	return size_of_message;
}

static int device_release(struct inode *inodep, struct file *filep)
{
	mutex_unlock(&encoder_mutex);
	already_read = 0;
	printk("Device released!\n");
	return 0;
}

/**
 * Probe function
 */
static int encoder_probe(struct platform_device *pdev)
{
	//declare variables
	int ret;
	mutex_init(&interrupt_mutex);
	printk("Starting probe function!\n");
	hello_init();
	//get device
	dev = &(pdev->dev);
	//verify device
	if(dev == NULL)
	{
		printk("Failed to get dev\n");
		return -1;
	}
	// get descriptors for pins
	button_desc = devm_gpiod_get(dev, "encoder", GPIOD_IN);
	if(button_desc == NULL)
	{
		printk("Failed to get encoder gpio descriptor\n");
		return -1;
	}
	//set debounce
	gpiod_set_debounce(button_desc, 1200000);
	// get irq number
	irq = gpiod_to_irq(button_desc);
	if(irq < 0)
	{	
		printk("Failed to get IRQ number\n");
		return -1;
	}
	//assign irq
	ret = request_irq((unsigned int) irq, (irq_handler_t) encoder_irq_handler, IRQF_TRIGGER_RISING, "anything", NULL);
	if(ret < 0)
	{
		printk("Failed to install irq\n");
		return -1;
	}
	last_time = ktime_get_ns();
	//should read 0 ns
	sprintf(message, "%lu\n", (unsigned long) last_time);
	size_of_message = strlen(message);
	printk("Initial message reads %s!\n", message);
	last_diff = last_time;
	already_read = 0;
	//print installation message
	printk("Driver Installed!\n");
	return 0;
}

// remove function
static int encoder_remove(struct platform_device *pdev)
{
	free_irq(irq, NULL);
	printk("Removed driver!\n");	
	hello_exit();
	return 0;
}

// set compatibale to get leds from device tree
static struct of_device_id matchy_match[] = {
    {.compatible = "encoder-driver"},
    {/* leave alone - keep this here (end node) */},
};

// platform driver object
static struct platform_driver adam_driver = {
	.probe	 = encoder_probe,
	.remove	 = encoder_remove,
	.driver	 = {
	       .name  = "The Rock: this name doesn't even matter",
	       .owner = THIS_MODULE,
	       .of_match_table = matchy_match,
	},
};

module_platform_driver(adam_driver);

// define interrupt handler
static irq_handler_t encoder_irq_handler(unsigned int irq, void *dev_id, struct pt_regs *regs)
{
	//print message
	u64 diff;
	printk("encoder_irq: Encoder interrupt triggered!\n");
	curr_time = ktime_get_ns();
	diff = curr_time - last_time;
	if (diff > 1000000)
	{
		//only count when greater than 1 ms
		//set last to curr
		while(mutex_is_locked(&interrupt_mutex));
		mutex_lock(&interrupt_mutex);
		last_time = curr_time;
		sprintf(message, "%lu\n", (unsigned long) diff);
		mutex_unlock(&interrupt_mutex);
		size_of_message = strlen(message);
		printk("Detected an encode press, time difference was %lu nanoseconds", (unsigned long) diff);
		last_diff = diff;
	}
	return (irq_handler_t) IRQ_HANDLED;
}

MODULE_DESCRIPTION("424\'s finest");
MODULE_AUTHOR("Josh Stidham");
MODULE_LICENSE("GPL v2");
MODULE_ALIAS("platform:adam_driver");
MODULE_VERSION("0.000001");

Credits

Matthew Parker
1 project • 0 followers
Gabriela Franco
0 projects • 0 followers
elliecchen
0 projects • 0 followers
Matt Gibson
0 projects • 0 followers

Comments