The Turkish Trio - Self Driving RC Car

This RC car system will successfully self-drive on a track by staying in between lanes, stopping at "Red Lights" and correcting speed.

IntermediateFull instructions provided12 hours87
The Turkish Trio - Self Driving RC Car

Things used in this project

Hardware components

Raspberry Pi 5
Raspberry Pi 5
×1
Webcam, Logitech® HD Pro
Webcam, Logitech® HD Pro
×1
Optical Speed Encoder
×1
Standard LCD - 16x2 White on Blue
Adafruit Standard LCD - 16x2 White on Blue
×1
RC Car
Includes the DC Motor used.
×1
SG90 Micro-servo motor
SG90 Micro-servo motor
Used the built in servo motors might now be this one.
×1
Portable Charger
×1

Software apps and online services

VS Code
Microsoft VS Code
OpenCV
OpenCV

Hand tools and fabrication machines

Tape, Duct
Tape, Duct

Story

Read more

Schematics

Device Tree File

This is for registering the GPIO pin 17 for encoder readings. This file is then compiled and added to the root of the device tree.

Code

Main Python Code

Python
This is the main python code that compiles all the elements needed for lane detection, stop sign detection, speed encoding, and plotting data.
# Autonomous Lane-Keeping Car with Dual-Lane Detection and Video Output
# Adapted from prior ELEC 424 projects and external sources:
# 
# User raja_961, Autonomous Lane-Keeping Car Using Raspberry Pi and OpenCV. Instructables.
# URL: https://www.instructables.com/Autonomous-Lane-Keeping-Car-Using-Raspberry-Pi-and/
#
# Also referenced ELEC 424 past projects posted on Hackster:
# URL: https://www.hackster.io/elec424/projects?sort=published

import RPi.GPIO as GPIO
import time
import cv2
import numpy as np
from datetime import datetime
import matplotlib.pyplot as plt
import subprocess
import re

# Video Playback
fourcc = cv2.VideoWriter_fourcc(*'XVID') #Writes the video 
out_transformed = cv2.VideoWriter('processed_output.avi', fourcc, 15.0, (320, 240)) #Transforms output for our specs

# GPIO Setup
DRIVE_PIN = 18 #Main motor set to pin 18 for PWM
SERVO_PIN = 19 #Servo motor set to pin 19 for PWM
FREQ = 50 #Set PWM Frequency to be 50Hz
NEUTRAL_DUTY = 7.5 #Defualt to 7.5 duty cycle used for making car and servo motors neutral
CONSTANT_DRIVE_DUTY = 7.99 #For constant driving instead of Decoder for testing
TOLERANCE = 0.001 #Constant for upping/lowering the speed
ENCODER_READ_INTERVAL = 10
frame_num = 0
TARGET_SPEED = 7.99 # Target speed in pulses/s

GPIO.setmode(GPIO.BCM) 
GPIO.setup(DRIVE_PIN, GPIO.OUT) #Initialize drive 18 pin to an output
GPIO.setup(SERVO_PIN, GPIO.OUT) #Initialize servo 19 pin to an output 

drive = GPIO.PWM(DRIVE_PIN, FREQ) #Initialize PWM Output for pin 18
servo = GPIO.PWM(SERVO_PIN, FREQ) #Initialize PWM Output for pin 19
drive.start(NEUTRAL_DUTY) #Default the drive motor to neutral so it is idle
servo.start(NEUTRAL_DUTY) #Default the servo motor to neutral so the car is facing forward and not turned

#PD Controller Variables
KP = 0.07 #Proportional Gain Value
KD = 0.08 #Derivative Gain Value
last_error = 0 #Error flag 
CAMERA_OFFSET = 0 #USED FOR TESTING, had camera offset in case because camera initially tilted

#Stop Detection Variables
first_stop_done = False #Stop sign initializtion flag
stopping = False #Final stop sign initialization flag 
stop_check_interval = 3 
stop_detection_cooldown = 0
COOLDOWN_FRAMES = 300 #How many frames the red color is masked for before Pi detects red again, so you can move past first stop sign
RESUME_DELAY = 5.0 # How long until you start moving again after stopping the first time
frame_counter = 0 #Frame flag to count how many frames it has been 

#Data Logger for plots!!!
log_data = {
    "frame": [],
    "error": [],
    "steering_duty": [],
    "proportional": [],
    "derivative": [],
    "speed": []
}

#All Image processig for lane detection and red light detection
def process_image(frame):
    global first_stop_done, stopping, frame_counter, stop_detection_cooldown #Initialize all the variables defined above in the function

    frame = cv2.resize(frame, (320, 240)) #Set resolution for camera to be 320x240
    lane_center = 160 #Half of 320 so center of frame is that
    centers = [] #

    #Stop Sign detection (HSV red in bottom region)
    stop_detected = False #Initially stop not detected
    if not stopping and stop_detection_cooldown == 0 and frame_counter % stop_check_interval == 0: 
        hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) #convert image to HSV  color space

        #Intervals for red. Red wraps around HSV values so need two intervals
        lower_red1 = np.array([0, 100, 100])
        upper_red1 = np.array([10, 255, 255])
        lower_red2 = np.array([160, 100, 100])
        upper_red2 = np.array([180, 255, 255])


        hsv_roi = hsv[190:240, :] #focus on bottom 50 pixels of the image for the stop sign 

        mask1 = cv2.inRange(hsv_roi, lower_red1, upper_red1) #red mask for first HSV interval
        mask2 = cv2.inRange(hsv_roi, lower_red2, upper_red2) #red mask for second HSV interval
        red_mask = cv2.bitwise_or(mask1, mask2) #Combine the two masks for a complete mask 

        cv2.imwrite("y-red_mask.jpg", red_mask) #Write an image every run to be used for debugging making sure we pick up contours

        red_area = np.sum(red_mask) / 255 # counting number of red pixels in the image
        stop_detected = red_area > 3000 #if teh number is greater than threshold you have found stop sign 

    #blue lane detection in HSV for lane detection 
    hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) #convert image to HSV for blue 

    #Interval for blue mask so we can do lane detection in HSV
    lower_blue = np.array([80, 30, 20])   
    upper_blue = np.array([140, 255, 255])
    blue_mask = cv2.inRange(hsv, lower_blue, upper_blue) #Blue mask for lane detetcion

    cv2.imwrite("y-blue_mask.jpg", blue_mask) # Write an image every run to be used for debugging making sure we pick up blue contours
    roi = blue_mask[100:240, :] #focus on lower half of image where lanes are
    contours, _ = cv2.findContours(roi, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) #find the contours in the image for the blue mask
    contours = sorted(contours, key=cv2.contourArea, reverse=True)[:2] #Take the two biggest contours and store them

    for c in contours:
        M = cv2.moments(c) #Using the contours in the image calculate the center of the contours
        if M['m00'] > 0:
            cx = int(M['m10'] / M['m00']) #Calculating centroid of each contour 
            cy = int(M['m01'] / M['m00']) + 120 #Accounting for cropped detection of lower half
            centers.append((cx, cy)) #append the contours
            cv2.circle(frame, (cx, cy), 5, (0, 255, 0), -1) #Draw green dot in the centroid of the line
            cv2.drawContours(frame, [c + [0, 120]], -1, (255, 0, 0), 2) # draw blue outline for the contour for detection

    if len(centers) == 2: #If two lanes detected then computing lane center between the two 
        lane_center = int((centers[0][0] + centers[1][0]) / 2) #Take the two centroids and determine center 
        cv2.line(frame, (lane_center, 120), (lane_center, 240), (0, 0, 255), 2) #Draw red line to show projected center line for car
        cv2.putText(frame, f'Lane Center: {lane_center}', (lane_center - 60, 115), #Put a text showing the pixel number that is center 
                    cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)

    error = lane_center - (160 + CAMERA_OFFSET) #horizontal lane error between calculated center and actual center, camera offset in zero but was previously used for debugging camera tilt
    out_transformed.write(frame) #use the frame for video playback 
    return error, stop_detected #return the error and if a stop is detected

def update_steering(error, frame_num):
    global last_error #Using the error from process_image we can update for correction of steering

    #PD Controller Logic
    proportional = KP * error #proportional part of the PD controller, Proportional gain and error mutiplied
    derivative = KD * (error - last_error) #Derivative part of PD controller, Derivative gain multiplied by the derivative error
    control_signal = proportional + derivative #add the proportional and derivative parts together to get complete PD
    duty_cycle = NEUTRAL_DUTY + control_signal #Update duty cycle based on a initial duty cycle of 7.5
    duty_cycle = max(6.0, min(9.0, duty_cycle)) #The max the car can turn is 9 and 6 so cap the values at the interval

    if duty_cycle == 6.0 or duty_cycle == 9.0: #If the number calculated for duty cycle exceeds interval print error for debugging
        print(f"[SATURATED] Frame {frame_num}, Error: {error}")

    servo.ChangeDutyCycle(duty_cycle) #Update the servo for correction with the new duty cycle

    #Log all the data calculated for plots being the error,frame number, duty cycle, P and D values, and the speed of the car
    log_data["frame"].append(frame_num)
    log_data["error"].append(error)
    log_data["steering_duty"].append(duty_cycle)
    log_data["proportional"].append(proportional)
    log_data["derivative"].append(derivative)
    log_data["speed"].append(speed)

    last_error = error #Update the error flag for next cycke
    return duty_cycle

def drive_forward():
    drive.ChangeDutyCycle(CONSTANT_DRIVE_DUTY) #Car updates PWM with the variable

def stop_drive():
    drive.ChangeDutyCycle(NEUTRAL_DUTY) #Reset PWM to neutral 

#Old function for saving data for logs CSV file, instead switching to outputting plots automatically
# def save_logs():
#     timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
#     with open(f"run_{timestamp}.csv", "w") as f:
#         f.write("frame,error,steering_duty,proportional,derivative,speed\n")
#         for i in range(len(log_data["frame"])):
#             f.write(f"{log_data['frame'][i]},{log_data['error'][i]},{log_data['steering_duty'][i]},"
#                     f"{log_data['proportional'][i]},{log_data['derivative'][i]},{log_data['speed'][i]}\n")

def plot_logs():
    # Plot 1: PD Values over Time
    fig, ax1 = plt.subplots()
    t_ax = np.array(log_data["frame"]) #Frames as x axis
    #P and D values as compared to the frames
    ax1.plot(t_ax, log_data["proportional"], '-', label="P values", color='blue')
    ax1.plot(t_ax, log_data["derivative"], '-', label="D values", color='orange')
    ax2 = ax1.twinx()
    #Error values plotted as well
    ax2.plot(t_ax, log_data["error"], '--r', label="Error")
    #Labels for each of the data logged
    ax1.set_xlabel("Frames")
    ax1.set_ylabel("PD Value")
    ax2.set_ylim(-90, 90)
    ax2.set_ylabel("Error Value")
    #Title the plot and add legend 
    plt.title("PD Values over time")
    fig.legend(loc='upper right')
    fig.tight_layout()
    #Make an output png for the plot
    plt.savefig("pd_plot.png")
    plt.clf()

    # Plot 2: Speed and Steer Duty Cycle, and Error vs. Time
    fig, ax1 = plt.subplots()
    t_ax = np.array(log_data["frame"]) #Frame as x axis
    #Speed, steering duty cycle, and error plotted compared to the frames 
    ax1.plot(t_ax, log_data["speed"], '-', label="Speed", color='blue')
    ax1.plot(t_ax, log_data["steering_duty"], '-', label="Steering", color='orange')
    ax2 = ax1.twinx()
    error = np.array(log_data["error"])
    #Need to normalize the error
    max_error = np.max(np.abs(error))
    if max_error != 0:
        normalized_error = error / max_error
    else:
        normalized_error = error
    #Initialize the error on the plot with label "Error"
    ax2.plot(t_ax, normalized_error, '--r', label="Error")
    #Set labels and title and legend for the rest of the plot
    ax1.set_xlabel("Frames")
    ax1.set_ylabel("Speed and Steer Duty Cycle")
    ax2.set_ylabel("Percent Error Value")
    plt.title("Speed and Steer Duty Cycle, and error v.s. time")
    fig.legend(loc='upper right') #Locate legend in top right part of frame
    fig.tight_layout()
    plt.savefig("voltage_plot.png") #Make a png file for this plot
    plt.clf()
#Actual Script!!!!

# Read kernel log for speed
def read_kernel_log():
    try:
        result = subprocess.run(['dmesg'], capture_output=True, text=True)
        return result.stdout
    except Exception as e:
        print(f"Error reading kernel log: {e}")
        return ""

# Parse speed from kernel log
def parse_speed(log):
    pattern = r"irq_handler - timing detected: \d+ ns, speed: (\d+\.\d+) pulses/s"
    matches = re.findall(pattern, log)
    if matches:
        return float(matches[-1]) # Return the latest speed
    return None

# Adjust speed to target 7.99
def adjust_speed(current_speed):
    global CONSTANT_DRIVE_DUTY
    if current_speed < TARGET_SPEED - TOLERANCE:
        CONSTANT_DRIVE_DUTY += 0.001
        print(f"[INFO] Increasing speed. Current: {current_speed:.2f}, New duty: {CONSTANT_DRIVE_DUTY:.3f}")
    elif current_speed > TARGET_SPEED + TOLERANCE:
        CONSTANT_DRIVE_DUTY -= 0.001
        print(f"[INFO] Decreasing speed. Current: {current_speed:.2f}, New duty: {CONSTANT_DRIVE_DUTY:.3f}")
    else:
        print(f"[OK] Speed is within range: {current_speed:.2f}")
    return CONSTANT_DRIVE_DUTY

try:
    cap = cv2.VideoCapture(0) #Initialize the camera
    cap.set(cv2.CAP_PROP_FRAME_WIDTH, 320) #Set the frame width to our resolution
    cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 240) #Set frame height to our resolution 
    time.sleep(2) #Have a 2 second delay 

    frame_num = 0
    max_frames = 1000 #If the video was playing too long we used to have the while loop be going for the amount of frames of the max

    while True:
        ret, frame = cap.read() #frame from camera captures
        if not ret:
            continue #In event fram does not capture skip iteration 

        frame_counter += 1 #Still increment frame though!
        if stop_detection_cooldown > 0: #When stop sign detected this if statement used to cooldown so the box does not get picked up again
            stop_detection_cooldown -= 1 #Decrement to reset flag
            print(f"Cooldown: {stop_detection_cooldown}") #Print statement for this

        error, stop_detected = process_image(frame)

        
        if frame_num % ENCODER_READ_INTERVAL == 0: # The line that the speed has been outputted on
            log = read_kernel_log()
            speed = parse_speed(log) # Valid speed
            if speed is not None:
                print(f"Speed from kernel: {speed:.2f} pulses/s")
                adjust_speed(speed) # Something went wrong
            else:
                print("[WARN] No speed reading available")
                speed = CONSTANT_DRIVE_DUTY

        if stop_detected: #When the red stop sign is detected 
            print("Stop box detected!") #Print statement for it
            stopping = True #Change the boolean
            stop_drive() #Stop the car reset the PWM to neutral
            time.sleep(RESUME_DELAY) #Have a delay until the car moves again for a full stop

            if not first_stop_done: #After first stop say when the car resumes driving
                print("Resuming after first stop")
                first_stop_done = True #Set the flag so we know the first stop is concluded
                stopping = False #no longer stopped drive 
                stop_detection_cooldown = COOLDOWN_FRAMES #Have a cooldown to make sure the red does not get detected again before it passes the sign
                drive_forward() #Drive the car
            else:
                print("Final stop, exiting.") #If first stop done is true and you see red again after cooldown frames then stop for good!
                break
        else:
            update_steering(error, frame_num) #Update PWM for servo based on lane detetcion 
            if not stopping:
                drive_forward() #Keep car driving while updating

        cv2.imshow('Live Feed', frame) #Show pocessed video in live feed
        if cv2.waitKey(1) == ord('q'): #If q is pressed you can exit loop early, but did not really work we would just use CTRL^C instead
            break

        frame_num += 1 #Update the frame counter

finally:
    # save_logs()
    plot_logs() #Run the plot functions
    cap.release() #Release the capture from camera
    out_transformed.release() #release the transformed output of camera
    cv2.destroyAllWindows() #Kill the CV 
    stop_drive() #Stop the car to neutral 
    drive.stop() #Stop the driving 
    servo.stop() #Stop the servo 
    GPIO.cleanup() #Clear all pins 

Driver for the Optical Encoder

C/C++
This is the c file of the driver which reads the encoder values from the kernel side using gpiod library. Speed of the car is calculated and logged to the kernel. The kernel log is read from the main python file to get the speed
#include <linux/module.h>
#include <linux/init.h>
#include <linux/platform_device.h>
#include <linux/of_device.h>
#include <linux/mod_devicetable.h>
#include <linux/gpio/consumer.h>
#include <linux/interrupt.h>
#include <linux/ktime.h>
#include <linux/fs.h>
#include <linux/file.h>

#define DEBOUNCE_MS 5                // Debounce time in milliseconds
#define ENCODER_GPIO_NAME "encoder"  // Matches "encoder-gpios" in DTS
#define PULSES_PER_REVOLUTION 360    // Example: 360 pulses per wheel revolution
#define WHEEL_CIRCUMFERENCE 0.2      // Example: 0.2 meters per revolution
#define ENCODER_DATA_FILE "/tmp/encoder_data" // File to write encoder data
#define SCALE_FACTOR 1000            // Scale factor for fixed-point arithmetic (e.g., 0.123 m/s -> 123)

static struct gpio_desc *encoder_gpio;
static unsigned int irq_number;
static ktime_t last_pulse_time;
static unsigned long pulse_count = 0;
static ktime_t last_rate_calc_time;
static unsigned long pulses_since_last_rate = 0;

// Function to calculate and log speed
static void log_encoder_speed(unsigned long pulses, s64 elapsed_us)
{
    unsigned long pulse_rate_int, car_speed_int;

    // Calculate pulse rate (pulses per second, scaled)
    pulse_rate_int = (pulses * 1000000UL) / elapsed_us; // Integer pulses per second
    pulse_rate_int *= SCALE_FACTOR; // Scale for precision

    // Calculate speed (m/s, scaled)
    car_speed_int = (pulse_rate_int / PULSES_PER_REVOLUTION) * (unsigned long)(WHEEL_CIRCUMFERENCE * SCALE_FACTOR);

    // Log to kernel
    pr_info("irq_handler - timing detected: speed: %d pulses/s\n", car_speed_int);
}

// Interrupt handler for encoder
// This function is called when the encoder is triggered
static irqreturn_t encoder_isr(int irq, void *dev_id)
{
    ktime_t now = ktime_get();
    s64 delta_us = ktime_us_delta(now, last_pulse_time);

    // Debounce: ignore pulses closer than DEBOUNCE_MS
    if (delta_us > (DEBOUNCE_MS * 1000)) {
        pulse_count++;
        pulses_since_last_rate++;
        last_pulse_time = now;

        // Check if 1 second has elapsed
        s64 elapsed_us = ktime_us_delta(now, last_rate_calc_time);
        if (elapsed_us >= 1000000) {  // 1 second
            // Call log_encoder_speed directly
            log_encoder_speed(pulses_since_last_rate, elapsed_us);

            pulses_since_last_rate = 0;
            last_rate_calc_time = now;
        }
    }

    return IRQ_HANDLED;
}

// Probe function
// This function is called when the driver is loaded
static int encoder_probe(struct platform_device *pdev)
{
    int ret;

    // Get encoder GPIO using libgpiod
    encoder_gpio = devm_gpiod_get(&pdev->dev, ENCODER_GPIO_NAME, GPIOD_IN);
    if (IS_ERR(encoder_gpio)) {
        pr_err("Failed to get encoder GPIO: %ld\n", PTR_ERR(encoder_gpio));
        return PTR_ERR(encoder_gpio);
    }

    // Set debounce (in microseconds)
    ret = gpiod_set_debounce(encoder_gpio, DEBOUNCE_MS * 1000);
    if (ret) {
        pr_err("Failed to set debounce for encoder GPIO\n");
        return ret;
    }

    // Get IRQ number
    irq_number = gpiod_to_irq(encoder_gpio);
    if (irq_number < 0) {
        pr_err("Failed to get IRQ for encoder GPIO: %d\n", irq_number);
        return irq_number;
    }

    // Request IRQ
    ret = request_irq(irq_number, encoder_isr, IRQF_TRIGGER_RISING, "encoder_irq", NULL);
    if (ret) {
        pr_err("Failed to request IRQ %d: %d\n", irq_number, ret);
        return ret;
    }

    // Initialize timing variables
    last_pulse_time = ktime_get();
    last_rate_calc_time = last_pulse_time;

    pr_info("Encoder driver initialized\n");
    return 0;
}

// Remove function
// This function is called when the driver is removed
static void encoder_remove(struct platform_device *pdev)
{
    free_irq(irq_number, NULL);
    pr_info("Encoder driver unloaded\n");
}

// Match table for encoder
// This table is used to match the driver with the correct device
static const struct of_device_id encoder_match_table[] = {
    { .compatible = "thechosen" },
    {},
};
MODULE_DEVICE_TABLE(of, encoder_match_table);

// Platform driver for encoder
// It is for connecting the method implementations to the function pointers in the kernel
static struct platform_driver encoder_driver = {
    .probe = encoder_probe,
    .remove = encoder_remove,
    .driver = {
        .name = "encoder_driver",
        .owner = THIS_MODULE,
        .of_match_table = encoder_match_table,
    },
};

module_platform_driver(encoder_driver);

MODULE_LICENSE("GPL");
MODULE_AUTHOR("ELEC 424 Team");
MODULE_DESCRIPTION("Optical encoder driver for speed control");

Device Tree File for the Encoder

Plain text
This is for registering the GPIO pin 17 for encoder readings. Thefile is then compiled and added to the root of the device tree.
/dts-v1/;
/plugin/;

/ {
    fragment@0 {
        target-path ="/";
        __overlay__ {
            encoder {
                compatible = "thechosen";
                encoder-gpios = <&gpio 17 0>;
            };
        };
    };
};

Credits

Reed Gallant
1 project • 0 followers
Tom Vincent
1 project • 0 followers
Harun Ustaoglu
1 project • 0 followers
Anjali Yamasani
1 project • 0 followers

Comments