Zain NazirCamden GrahamZhongqi GaoLeonardo Nunez
Published

Really Interesting Super Cool Car (Cardboard Box)

We built a car that autonomously follows lane lines on a track using a camera vision system and PD controller running on a Raspberry Pi 5.

AdvancedFull instructions provided92
Really Interesting Super Cool Car (Cardboard Box)

Things used in this project

Hardware components

Raspberry Pi 5
Raspberry Pi 5
×1
Barebones RC Car with DC Motor and Servo Motor
×1
Portable Battery Bank
×1
Webcam, Logitech® HD Pro
Webcam, Logitech® HD Pro
×1
Optilcal Speed Encoder
×1

Software apps and online services

OpenCV
OpenCV

Hand tools and fabrication machines

Cardboard Cutters (Scissors)
Cardboard Fasteners (Tape)

Story

Read more

Code

finalRun.py

Python
Main lane detection code needed for our vehicles behavior on the track.
import cv2
import numpy as np
import math
import time
from gpiozero import PWMOutputDevice

import matplotlib.pyplot as plt
import signal
import sys


encoder_path = "/sys/module/gpiod_driver/parameters/encoder_interval_ns"
encoder_values = []
# smaller value corresponds to faster rotation
max_rotate = 5000000
min_rotate = 8500000
delta = 0.001
frame_numbers = []
errors = []
steering_pwms = []
speed_pwms = []


proportional_responses = []
derivative_responses = []
motor_pwm = PWMOutputDevice(19, frequency = 50)
steering_pwm = PWMOutputDevice(18, frequency = 50)
# Signal handler function to gracefully stop the program and plot collected data
def signal_handler(sig, frame):
    print("\n[INFO] Ctrl+C detected. Plotting data and exiting...")

    # Set motor and steering PWM values to neutral to stop the car
    motor_pwm.value = .075
    steering_pwm.value = .075

    # Plot the proportional and derivative responses with error
    plot_propdir(proportional_responses, derivative_responses, errors)

    # Plot the PWM signals for speed and steering, along with error
    plot_pwm(speed_pwms, steering_pwms, errors)

    # Exit the program cleanly
    sys.exit(0)

# Register the above signal handler for Ctrl+C (SIGINT)
signal.signal(signal.SIGINT, signal_handler)


# Function to plot proportional and derivative control values along with error
def plot_propdir(p_vals, d_vals, error):
    # Create a figure with a primary y-axis and a secondary y-axis
    fig, ax1 = plt.subplots()
    t_ax = np.arange(len(p_vals))  # X-axis: time/frame index

    # Plot proportional (P) and derivative (D) values on primary y-axis
    ax1.plot(t_ax, p_vals, '-', label="P-vals")
    ax1.plot(t_ax, d_vals, '-', label="D-vals")

    # Secondary y-axis for error values
    ax2 = ax1.twinx()
    ax2.plot(t_ax, error, '--r', label="Error")  # Dashed red line for error

    ax1.set_xlabel("Frames")
    ax1.set_ylabel("PD Value")
    ax2.set_ylim(-90, 90)
    ax2.set_ylabel("Error value")

    # Add title and legend, adjust layout and save plot to file
    plt.title("PD Values over time")
    fig.legend()
    fig.tight_layout()
    plt.savefig("/home/pi/test_pd_plot.png")
    plt.clf()  # Clear the figure to free memory
    return

def plot_pwm(speed_pwms, turn_pwms, error):
    # Create a figure with dual y-axes
    fig, ax1 = plt.subplots()
    t_ax = np.arange(len(speed_pwms))  # X-axis: time/frame index

    # Plot speed and steering PWM values on primary y-axis
    ax1.plot(t_ax, speed_pwms, '-', label="Speed")
    ax1.plot(t_ax, turn_pwms, '-', label="Steering")

    # Normalize and plot error on the same primary y-axis for visual comparison
    ax1.plot(t_ax, error / np.max(error), '--r', label="Error")

    ax1.set_xlabel("Frames")
    ax1.set_ylabel("Speed and Steer Duty Cycle")

    # Define secondary axis
    ax2 = ax1.twinx()
    ax2.set_ylabel("% Error Value")  # Optional secondary label

    plt.title("Speed + Steer Duty Cycle + error vs time")
    fig.legend()
    plt.savefig("/home/pi/test_change_steer_plot.png")
    plt.clf()  # Clear the figure after saving  
    return

def convert_to_HSV(frame):
    hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
    # cv2.imshow("HSV", hsv)
    return hsv


def detect_edges(frame):
    # lowered the threshold
    lower_blue = np.array([85, 50, 50], dtype="uint8")
    upper_blue = np.array([140, 255, 255], dtype="uint8")
    # this mask will filter out everything but blue
    mask = cv2.inRange(hsv, lower_blue, upper_blue)

    # detect edges
    edges = cv2.Canny(mask, 50, 100)
    #cv2.imshow("edges", edges)
    return edges


def detect_red(frame):
    # low_red = np.array([161, 155, 84])
    # high_red = np.array([179, 255, 255])
    # red_mask = cv2.inRange(frame, low_red, high_red)
    # lower mask (0-10)
    lower_red = np.array([0, 50, 50])
    upper_red = np.array([10, 255, 255])
    mask0 = cv2.inRange(frame, lower_red, upper_red)

    # upper mask (170-180)
    lower_red = np.array([170, 50, 50])
    upper_red = np.array([180, 255, 255])
    mask1 = cv2.inRange(frame, lower_red, upper_red)

    # join my masks
    red_mask = cv2.bitwise_or(mask0, mask1)
    # red = cv2.bitwise_and(frame, frame, mask = red_mask)

    countRed = np.count_nonzero(red_mask)
    # if countRed > 0:
    # print("Red value: ",countRed)
    if countRed > 10:
        return True
    else:
        return False


def detect_green(frame):
    low_green = np.array([25, 52, 72])
    high_green = np.array([102, 255, 255])

    green_mask = cv2.inRange(frame, low_green, high_green)
    # red = cv2.bitwise_and(frame, frame, mask = red_mask)

    countGreen = np.count_nonzero(green_mask)
    # print("Green value: ",countGreen)
    if countGreen > 110:
        return True
    else:
        return False
def manage_speed():
    f = open(encoder_path, "r")
    encode_val = int(f.readline())
    f.close()
    
    encoder_values.append(encode_val)
    print("Calculated speed")
    print(encode_val)

    if encode_val <= max_rotate:
        print("reduce speed")
        ret_val = -delta
    elif encode_val >= min_rotate:
        print("increase speed")
        ret_val = delta
    else:
        print("maintain speed")
        ret_val = 0  # Default if within range

    return ret_val

def region_of_interest(edges):
    height, width = edges.shape  # extract the height and width of the edges frame
    # make an empty matrix with same dimensions of the edges frame
    mask = np.zeros_like(edges)

    # only focus lower half of the screen
    # specify the coordinates of 4 points (lower left, upper left, upper right, lower right)
    polygon = np.array([[
        (0, height),
        (0,  height/2),
        (width, height/2),
        (width, height),
    ]], np.int32)

    cv2.fillPoly(mask, polygon, 255)  # fill the polygon with blue color
    cropped_edges = cv2.bitwise_and(edges, mask)
    # cv2.imshow("roi", cropped_edges)
    return cropped_edges


def detect_line_segments(cropped_edges):
    rho = 1
    theta = np.pi / 180
    min_threshold = 10
    line_segments = cv2.HoughLinesP(cropped_edges, rho, theta, min_threshold,
                                    np.array([]), minLineLength=5, maxLineGap=0)
    return line_segments


def make_points(frame, line):
    height, width, _ = frame.shape
    slope, intercept = line
    y1 = height  # bottom of the frame
    y2 = int(y1 / 2)  # make points from middle of the frame down

    if slope == 0:
        slope = 0.1

    x1 = int((y1 - intercept) / slope)
    x2 = int((y2 - intercept) / slope)

    return [[x1, y1, x2, y2]]


def average_slope_intercept(frame, line_segments):
    lane_lines = []

    if line_segments is None:
        # print("no line segment detected")
        return lane_lines

    height, width, _ = frame.shape
    left_fit = []
    right_fit = []
    boundary = 1/3

    left_region_boundary = width * (1 - boundary)
    right_region_boundary = width * boundary

    for line_segment in line_segments:
        for x1, y1, x2, y2 in line_segment:
            if x1 == x2:
                # print("skipping vertical lines (slope = infinity)")
                continue

            fit = np.polyfit((x1, x2), (y1, y2), 1)
            slope = (y2 - y1) / (x2 - x1)
            intercept = y1 - (slope * x1)

            if slope < 0:
                if x1 < left_region_boundary and x2 < left_region_boundary:
                    left_fit.append((slope, intercept))
            else:
                if x1 > right_region_boundary and x2 > right_region_boundary:
                    right_fit.append((slope, intercept))

    left_fit_average = np.average(left_fit, axis=0)
    if len(left_fit) > 0:
        lane_lines.append(make_points(frame, left_fit_average))

    right_fit_average = np.average(right_fit, axis=0)
    if len(right_fit) > 0:
        lane_lines.append(make_points(frame, right_fit_average))

    # lane_lines is a 2-D array consisting the coordinates of the right and left lane lines
    # for example: lane_lines = [[x1,y1,x2,y2],[x1,y1,x2,y2]]
    # where the left array is for left lane and the right array is for right lane
    # all coordinate points are in pixels
    return lane_lines


def display_lines(frame, lines, line_color=(0, 255, 0), line_width=6):  # line color (B,G,R)
    line_image = np.zeros_like(frame)

    if lines is not None:
        for line in lines:
            for x1, y1, x2, y2 in line:
                cv2.line(line_image, (x1, y1), (x2, y2),
                         line_color, line_width)

    line_image = cv2.addWeighted(frame, 0.8, line_image, 1, 1)
    return line_image


def get_steering_angle(frame, lane_lines):
    height, width, _ = frame.shape

    if len(lane_lines) == 2:  # if two lane lines are detected
        # extract left x2 from lane_lines array
        _, _, left_x2, _ = lane_lines[0][0]
        # extract right x2 from lane_lines array
        _, _, right_x2, _ = lane_lines[1][0]
        mid = int(width / 2)
        x_offset = (left_x2 + right_x2) / 2 - mid
        y_offset = int(height / 2)
        print("both line")

    elif len(lane_lines) == 1:  # if only one line is detected
        x1, _, x2, _ = lane_lines[0][0]
        x_offset = x2 - x1
        y_offset = int(height / 2)
        print("1 line")

    elif len(lane_lines) == 0:  # if no line is detected
        x_offset = 0
        y_offset = int(height / 2)
        print("No line detected")

    angle_to_mid_radian = math.atan(x_offset / y_offset)
    angle_to_mid_deg = int(angle_to_mid_radian * 180.0 / math.pi)
    steering_angle = angle_to_mid_deg + 90

    return steering_angle


def display_heading_line(frame, steering_angle, line_color=(0, 0, 255), line_width=5):

    heading_image = np.zeros_like(frame)
    height, width, _ = frame.shape

    steering_angle_radian = steering_angle / 180.0 * math.pi
    x1 = int(width / 2)
    y1 = height
    x2 = int(x1 - height / 2 / math.tan(steering_angle_radian))
    y2 = int(height / 2)

    cv2.line(heading_image, (x1, y1), (x2, y2), line_color, line_width)

    heading_image = cv2.addWeighted(frame, 0.8, heading_image, 1, 1)

    return heading_image

def set_duty_cycle(device, duty_cycle_percent):
        """Set PWM device value based on duty cycle percentage."""
        device.value = duty_cycle_percent / 100
def display_heading_line(frame, steering_angle, line_color=(0, 0, 255), line_width=5):

    heading_image = np.zeros_like(frame)
    height, width, _ = frame.shape

    steering_angle_radian = steering_angle / 180.0 * math.pi
    x1 = int(width / 2)
    y1 = height
    x2 = int(x1 - height / 2 / math.tan(steering_angle_radian))
    y2 = int(height / 2)

    cv2.line(heading_image, (x1, y1), (x2, y2), line_color, line_width)

    heading_image = cv2.addWeighted(frame, 0.8, heading_image, 1, 1)

    return heading_image


# Variables to be updated each loop
lastTime = 0
lastError = 0

# PWM pin


neutral_speed_pwm = 0.075
# Car only moves when at .08 threshold.
base_speed_pwm = neutral_speed_pwm + 0.005

def get_pid_control(deviation, last_error, integral, last_time):
    # Current time
    current_time = time.time()
    delta_time = current_time - last_time

    # Proportional term
    P = Kp * deviation

    # Integral term (accumulating error over time)
    integral += deviation * delta_time
    I = Ki * integral

    # Derivative term (change of error over time)
    D = Kd * (deviation - last_error) / delta_time if delta_time > 0 else 0

    # Compute PID output
    pid_output = P + I + D

    # Return new PID values
    return pid_output, last_error, integral, current_time
if __name__ == "__main__":
    video = cv2.VideoCapture(0)
    video.set(cv2.CAP_PROP_FRAME_WIDTH, 320)
    video.set(cv2.CAP_PROP_FRAME_HEIGHT, 240)

    # Variables to be updated each loop
    last_time = time.time()  # Last time the PID control was updated
    last_error = 0  # Last deviation
    integral = 0  # Integral of error
    rotbase = 7.5
    lastTime = 0
    lastError = 0
    j= 0

    # PID constants (Adjusted for noisy input)
    Kp = 0.1   # Lower proportional constant to avoid overshoot due to noise
    Ki = 0.02  # Small integral constant to prevent drift without amplifying noise
    Kd = 0.6   # Higher derivative constant to better dampen the effect of noise and rapid fluctuations


    #Adding data points
    der_r = np.array([])
    pro_r = np.array([])
    err = np.array([])
    steering = np.array([])
    speed = np.array([])

    curr_speed = .075


    # === ESC Calibration ===
    print("Calibrating ESC...")
    set_duty_cycle(motor_pwm, 10)   # Max motor_pwm
    time.sleep(2)
    set_duty_cycle(motor_pwm, 5)    # Mid motor_pwm
    time.sleep(2)
    set_duty_cycle(motor_pwm, 7.5)  # Neutral
    time.sleep(2)
    #stop_motors()
    time.sleep(1)

    frame_no = -1
    # PI constants
    kpr=5
    kdr=8
    counter = 0
    try:
        while True:
            frame_no += 1
            ret, frame = video.read()
            frame = cv2.resize(frame, (150, 100), interpolation=cv2.INTER_CUBIC)

            hsv = convert_to_HSV(frame)
            edges = detect_edges(hsv)
            roi = region_of_interest(edges)
            line_segments = detect_line_segments(roi)
            lane_lines = average_slope_intercept(frame, line_segments)
            lane_line_image = display_lines(frame, lane_lines)
            steering_angle = get_steering_angle(frame, lane_lines)

            deviation = steering_angle - 90  # Calculate the deviation from the center (90 degrees)
            errors.append(deviation)
            frame_numbers.append(frame_no)

            # PD control
            now = time.time()
            dovert = now - lastTime if lastTime > 0 else 0.01  # Avoid division by zero

            dv_r = kdr * (deviation - lastError) / dovert
            pr_r = kpr * deviation

            # Store responses for plotting
            proportional_responses.append(pr_r)
            derivative_responses.append(dv_r)

            # Save steering and speed for plotting
            steering_pwms.append(steering_pwm.value * 100)  # Convert back to % for readability
            speed_pwms.append(motor_pwm.value * 100)



            # Apply PID control to smooth the steering
            pid_output, last_error, integral, last_time = get_pid_control(deviation, last_error, integral, last_time)


            # Steering control logic using the PID output
            # print(f"Steering Angle: {steering_angle}, Deviation: {deviation}, PID Output: {pid_output}")

            cv2.imshow('Edges', edges)
            final_output = display_heading_line(lane_line_image, steering_angle)
            cv2.imshow("Lane Detection Output", final_output)
            #deviation calculation for P and D
            now = time.time()
            dt = now - lastTime
            deviation = steering_angle - 90

            counter = counter + 1
            if deviation < 3 and deviation > -3: #Min margin before turning
                #Find new derivative and proportional gain
                deviation = 0
                derivative_r = kdr * (deviation - lastError) / dt
                proportional_r = kpr * deviation

                PD_r = derivative_r + proportional_r
                rot = rotbase - PD_r/90
                steering_pwm.value = 7.5/100
                # print("rot is")
                # print(rot)
                if counter % 3 == 0:
                    temp_speed = manage_speed() + curr_speed
                    print('Temp speed')
                    print(temp_speed)
                    if temp_speed != curr_speed:
                        if temp_speed >= .088:
                            temp_speed = .088
                            motor_pwm.value = .088
                        elif temp_speed <= .075:
                            temp_speed = .075
                            motor_pwm.value = .075
                        else:
                            motor_pwm.value = temp_speed
                        curr_speed = temp_speed


                motor_pwm.value =  curr_speed
            else:
                #Find new derivative and proportional gain
                derivative_r = kdr * (deviation - lastError) / dt
                proportional_r = kpr * deviation
                PD_r = derivative_r + proportional_r
                rot = rotbase + PD_r/90
                # print("rot is")
                # print(rot)
                #Making sure rot stay in the correct duty cycle range.
                if rot>9.5:
                    rot = 9.5
                elif rot<5.5:
                    rot = 5.5
                steering_pwm.value = rot/100


                if counter % 3 == 0:
                    temp_speed = manage_speed() + curr_speed
                    print('Temp speed')
                    print(temp_speed)

                    if temp_speed != curr_speed:
                        print("Speed mismatch")
                        if temp_speed >= .088:
                            temp_speed = .088
                            motor_pwm.value = .088
                        elif temp_speed <= .075:
                            temp_speed = .075
                            motor_pwm.value = .075
                        else:
                            motor_pwm.value = temp_speed

                        curr_speed = temp_speed
                print("curr_speed")
                print(curr_speed)
                motor_pwm.value =  curr_speed


            key = cv2.waitKey(1)
            time.sleep(.025)
            if key == 27:  # ESC key pressed
                # plot_encoder_values()
                motor_pwm.value = .075
                steering_pwm.value = .075

                plot_propdir(proportional_responses, derivative_responses, errors)
                plot_pwm(speed_pwms, steering_pwms, errors)
                break

    finally:
        # plot_control_vs_frame()
        # plot_pid_components()
        video.release()
        cv2.destroyAllWindows()

gpiod_driver.c

C/C++
#include <linux/module.h>
#include <linux/of_device.h>
#include <linux/kernel.h>
#include <linux/gpio/consumer.h>
#include <linux/platform_device.h>
#include <linux/interrupt.h>
#include <linux/timekeeping.h>
#include <linux/ktime.h>
#include <linux/moduleparam.h>
#include <linux/hrtimer.h>
#include <linux/of.h>

/**
* Code from:  Haochen(Jason) Zhang, Ye Zhou, Konstantinos Alexopoulos
* Class: COMP 424 ELEC 424/553 001
* Final Project: Autonomous Car
* Fall 2023
*
* Interrupt code drawn from:
* https://github.com/Johannes4Linux/Linux_Driver_Tutorial/blob/main/11_gpio_irq/gpio_irq.c
**/

// Global variables
unsigned int gpio_irq_number;
static struct gpio_desc *my_enc;
static volatile u64 timestamp_previous_ns;
static volatile u64 timestamp_current_ns;

static int encoder_interval_ns;
module_param(encoder_interval_ns, int, 0644);

// Interrupt handler for GPIO rising edge
static irq_handler_t gpio_irq_handler(unsigned int irq, void *dev_id, struct pt_regs *regs) {

	// Ensure rising edges are spaced more than 1ms apart
	int gpio_state = gpiod_get_value(my_enc); 
	timestamp_current_ns = ktime_get_ns();
	unsigned long delta_ns = timestamp_current_ns - timestamp_previous_ns;

	if (gpio_state == 1 && delta_ns > 1000000) {
		timestamp_previous_ns = timestamp_current_ns;
		encoder_interval_ns = delta_ns;
		printk(KERN_INFO "gpio_irq_handler - Time delta: %lu ns", delta_ns);
	}
	
	return (irq_handler_t) IRQ_HANDLED; 
}

// Probe function when platform device is bound
static int encoder_probe(struct platform_device *pdev)
{
	struct device *dev = &pdev->dev;
	
	printk(KERN_INFO "gpiod_driver: Module inserted, probe running\n");

	// Get GPIO descriptor from device tree
	my_enc = gpiod_get(dev, "enco", GPIOD_IN);
	if (IS_ERR(my_enc)) {
		printk("encoder_probe - Failed to get GPIO descriptor\n");
		return -1;
	}
	
	// Configure interrupt and debounce time
	gpio_irq_number = gpiod_to_irq(my_enc);
	gpiod_set_debounce(my_enc, 1000000);
	
	if (request_irq(gpio_irq_number, (irq_handler_t) gpio_irq_handler, IRQF_TRIGGER_RISING, "my_gpio_irq", NULL) != 0) {
		printk("Error!\nFailed to request IRQ number: %d\n", gpio_irq_number);
		return -1;
	}

	timestamp_previous_ns = ktime_get_ns();
	printk("encoder_probe - SUCCESS\n");
	return 0;
}

// Cleanup function on device unbind
static void encoder_remove(struct platform_device *pdev)
{
	free_irq(gpio_irq_number, NULL);
	printk("encoder_remove - IRQ freed, module removed\n");
	return;
}

static struct of_device_id encoder_dt_ids[] = {
    { .compatible = "a_second_name", },
	{},
};

static struct platform_driver encoder_driver = {
	.probe	 = encoder_probe,
	.remove	 = encoder_remove,
	.driver	 = {
	       .name  = "gpio_encoder_driver",
	       .owner = THIS_MODULE,
	       .of_match_table = encoder_dt_ids,
	},
};
module_platform_driver(encoder_driver);

MODULE_ALIAS("platform:gpio_encoder_driver");
MODULE_DESCRIPTION("Team Team");
MODULE_AUTHOR("LeonardoN, ZhongqiG, ZainN, CamdenG");
MODULE_LICENSE("GPL v2");

Credits

Zain Nazir
1 project • 0 followers
Camden Graham
1 project • 0 followers
Zhongqi Gao
1 project • 0 followers
Leonardo Nunez
1 project • 0 followers

Comments