Youjia TongBeiming ZhangAniket Jadhav
Published

Vision-Based Autonomous Lane Keeping Vehicle

Autonomous RC car with lane detection, PD steering, encoder speed control, and object detection.

IntermediateShowcase (no instructions)41
Vision-Based Autonomous Lane Keeping Vehicle

Things used in this project

Hardware components

Raspberry Pi 5
Raspberry Pi 5
×1
Webcam, Logitech® HD Pro
Webcam, Logitech® HD Pro
×1
Optical encoder
×1
Raspberry Pi 5 Power Adapter, Battery, Power Bank
×1
RC car
×1

Software apps and online services

OpenCV
OpenCV

Hand tools and fabrication machines

Scissor, Electrician
Scissor, Electrician
Servo Motor, Premium Male/Male Jumper Wires
Servo Motor, Premium Male/Male Jumper Wires
Tape

Story

Read more

Code

run.py

Python
# ==============================================================================
# ELEC 553 Final Project - Main Autonomous Driving Script
# 
# This script handles the main autonomous loop for the RPi 5 RC Car. 
# It reads camera frames to perform HSV color filtering for lane keeping (blue) and stop box detection (red).
# It utilizes a PD controller for smooth steering, and reads hardware encoder interrupts via a sysfs file to
# dynamically maintain a constant motor speed.
#
# Citation: User raja_961, "Autonomous Lane-Keeping Car Using Raspberry Pi and OpenCV"
# URL: https://www.instructables.com/Autonomous-Lane-Keeping-Car-Using-Raspberry-Pi-and/
# ==============================================================================

import cv2
import numpy as np
import lgpio
import time
import csv

# 
# HARDWARE PIN SETUP
# 
STEER_PIN = 19
MOTOR_PIN = 18
GPIO_CHIP = 4  # RPi 5 utilizes gpiochip4 for its hardware IO

# 
# PWM & ESC CALIBRATION CONSTANTS
# 
# Duty cycles: 7.5 = Neutral, >7.5 = Forward/Right, <7.5 = Reverse(motor doesnt seem to go backwards)/Left
STEER_CENTER = 7.5
STEER_MIN = 6.2  # Physical left limit
STEER_MAX = 8.8  # Physical right limit
MOTOR_STOP = 7.5
BASE_SPEED = 8.0  # baseline forward speed
PWM_FREQ = 50     # 50 Hz standard for RC servos and ESCs

# 
# PD CONTROLLER GAINS (Steering)
# 
# Higher kp -> sharper turns.
Kp = 0.02  
# Higher kd => less oscillation/shaking.
Kd = 0.002  

# 
# STEERING SMOOTHING (Low-Pass Filter)
# 
# 0.0 -> Instant response;
# 1.0 -> Completely ignores new calculations
SMOOTH = 0.6

# 
# ENCODER & CRUISE CONTROL CONSTANTS
# 
# We read the custom kernel driver's output from the sysfs virtual file system
ENCODER_SYSFS = "/sys/module/encoder_driver/parameters/encoder_rate" # note that the vfs gives rates not intervals
TARGET_INTERVAL = 9     # Target milliseconds between encoder ticks
SPEED_STEP = 0.003      # Step size to adjust PWM to maintain target speed


# 
# lgpio INITIALIZATION
# 
gpio_handle = lgpio.gpiochip_open(GPIO_CHIP)

lgpio.gpio_claim_output(gpio_handle, STEER_PIN)
lgpio.gpio_claim_output(gpio_handle, MOTOR_PIN)

# Helper function to cleanly send PWM signals to a specified pin."""
def set_pwm(pin, duty_cycle):
    lgpio.tx_pwm(gpio_handle, pin, PWM_FREQ, duty_cycle)

# Initialize both to neutral
set_pwm(STEER_PIN, STEER_CENTER)
set_pwm(MOTOR_PIN, MOTOR_STOP)
print("Waiting for ESC to initialize...")
time.sleep(2)

# 
# CAMERA INITIALIZATION
# 
cap = cv2.VideoCapture(0, cv2.CAP_V4L2)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 240)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 180)

if not cap.isOpened():
    print("Camera failed to open!")
    lgpio.gpiochip_close(gpio_handle)
    exit()

# 
# DATA LOGGING SETUP
# 
LOG_FILE = "/home/pi/run_data.csv"

# Initializes the CSV file with column headers for graphing later.
def init_csv():
    with open(LOG_FILE, "w", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(
            [
                "Frame",
                "Error",
                "Steer_PWM",
                "Motor_PWM",
                "Prop_Response",
                "Deriv_Response",
                "Encoder_Interval_ms",
            ]
        )

# Appends a single frame's calculated states to the CSV file.
def log_row(frame_num, error, steer_pwm, motor_pwm, prop, deriv, enc):
    with open(LOG_FILE, "a", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(
            [
                frame_num,
                round(error, 3) if error is not None else 0,
                round(steer_pwm, 4),
                round(motor_pwm, 4),
                round(prop, 4),
                round(deriv, 4),
                enc,
            ]
        )

# 
# HARDWARE ENCODER READER & SPEED CONTROL
# 

# Helper to get the current millisecond interval between encoder ticks from the kernel driver.
def read_encoder():
    try:
        with open(ENCODER_SYSFS, "r") as f:
            return 1000 / int(f.read().strip())  # Convert rate (ticks/sec) to interval (ms)
    except:
        return 0  # If the driver isn't loaded or throws an error, return 0 to hold speed

# Dynamically adjusts the motor's duty cycle to maintain a constant physical speed regardless of battery drain or track friction.
def adjust_speed(current_dc, interval):
    # Ignore noises
    if interval == 0 or interval < 1:
        return current_dc
    
    # Adjust towards the target interval
    # too slow
    if interval > TARGET_INTERVAL:
        new_dc = current_dc + SPEED_STEP
    # too fast
    elif interval < TARGET_INTERVAL:
        new_dc = current_dc - SPEED_STEP
    else:
        new_dc = current_dc
        
    # Clamp between 8.0 and 8.2 so the speed is stable
    return max(8.0, min(8.2, new_dc))


# 
# LANE DETECTION
# 

# Processes the camera frame to find the blue tape, calculates the center 
# of the lane, and returns the pixel error from the camera's center.
def get_lane_error(frame):
    # Convert from BGR to HSV
    hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)

    # Define blue range
    lower_blue = np.array([80, 30, 30])
    upper_blue = np.array([155, 255, 255])
    mask = cv2.inRange(hsv, lower_blue, upper_blue)

    # Apply morphological opening to remove random background noise
    kernel = np.ones((5, 5), np.uint8)
    mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel)

    img_h, img_w = mask.shape  

    # We sample the image at 5 specific horizontal rows rather than processing the whole image.
    # The lower rows are physically closer to the car, so we give them a higher mathematical weight.
    rows = [
        int(img_h * 0.55),
        int(img_h * 0.65),
        int(img_h * 0.75),
        int(img_h * 0.85),
        int(img_h * 0.92),
    ]
    weights = [0.8, 1.0, 1.2, 1.5, 1.8]
    centers = []

    # Calculate the center of the blue pixels along each sampled row
    for i, r in enumerate(rows):
        strip = mask[r : r + 4, :]
        _, xs = np.where(strip > 0)
        if len(xs) > 10: # Ensure there is enough blue to constitute a valid line
            centers.append((np.mean(xs), weights[i]))

    blue_pixels = cv2.countNonZero(mask)

    if not centers:
        return None, blue_pixels

    # Calculate the overall weighted average center of the lane
    weighted_sum = sum(c * wt for c, wt in centers)
    total_weight = sum(wt for _, wt in centers)
    lane_center = weighted_sum / total_weight

    # Calculate error: Positive error = lane is to the right = car needs to steer right
    error = lane_center - (img_w / 2)
    # Clamp the error to prevent extreme steering values if the camera glitches
    error = max(-90.0, min(90.0, error))

    return error, blue_pixels


# 
# STOP BOX DETECTION
# 
def is_red_stop(frame):
    """Checks the bottom portion of the frame for the red stop box."""
    hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)

    # Define red range
    mask1 = cv2.inRange(hsv, np.array([9, 36, 149]), np.array([24, 147, 209])) 
    mask2 = cv2.inRange(hsv, np.array([0, 13, 000]), np.array([11, 250, 255]))
    red_mask = mask1 | mask2

    img_h, img_w = red_mask.shape
    
    # Define a ROI
    roi = red_mask[int(img_h * 0.10) : int(img_h * 0.95), :]
    
    # Calculate the percentage of red pixels in the ROI
    ratio = cv2.countNonZero(roi) / (roi.shape[0] * roi.shape[1])
    
    # If larger than threshold, we have a red stop sign
    return ratio > 0.15


# ==============================================================================
# DRIVING LOOP (MAIN LOOP)
# ==============================================================================

# Variables for status
last_error = 0.0
last_steer = STEER_CENTER 
passed_first_stop = False
last_stop_time = 0
current_motor = BASE_SPEED
frame_num = 0

init_csv()

try:
    print("Starting  car will move now!")
    set_pwm(MOTOR_PIN, BASE_SPEED)

    while True:
        ret, frame = cap.read()
        if not ret:
            print("Camera read failed")
            break

        frame_num += 1

        # stop Box Check
        if is_red_stop(frame):
            # Prevent from immediately stopping again on the same box
            if time.time() - last_stop_time > 3:
                print(f"[{frame_num}] RED STOP detected")
                log_row(frame_num, 0, STEER_CENTER, MOTOR_STOP, 0, 0, 0)

                # Brake gradually over a few hundred milliseconds. 
                for brake_dc in [7.8, 7.6, 7.5]:
                    set_pwm(MOTOR_PIN, brake_dc)
                    time.sleep(0.1)

                # Stop
                set_pwm(MOTOR_PIN, MOTOR_STOP)
                set_pwm(STEER_PIN, STEER_CENTER)

                # Stop at the first box, terminate at the second box.
                if not passed_first_stop:
                    time.sleep(3)
                    passed_first_stop = True
                    last_stop_time = time.time()
                    set_pwm(MOTOR_PIN, BASE_SPEED)
                    current_motor = BASE_SPEED
                    print("Resuming after first stop...")
                    time.sleep(0.5)
                else:
                    # If at final stop, break the loop
                    print("Final stop. Done.")
                    break

        # Lane Detection & Steering Calculation
        error, blue_px = get_lane_error(frame)

        if error is None:
            # If the camera completely loses the lane, hold the wheels straight
            steer_val = STEER_CENTER
            prop = 0.0
            deriv = 0.0
        else:
            # Proportional term
            prop = Kp * error
            # Derivative term
            deriv = Kd * (error - last_error)
            
            # Note: Subtracting because a positive error (lane on right) requires a larger PWM (steer right)
            raw_steer = STEER_CENTER - (prop + deriv)
            raw_steer = max(STEER_MIN, min(STEER_MAX, raw_steer))

            # Apply a low-pass filter to smooth the servo movement
            steer_val = SMOOTH * last_steer + (1 - SMOOTH) * raw_steer
            
            # Update state variables
            last_steer = steer_val
            last_error = error

        # Execute the calculated steering angle
        set_pwm(STEER_PIN, steer_val)

        # Motor Speed Adjustment
        enc_interval = read_encoder()
        current_motor = adjust_speed(current_motor, enc_interval)
        set_pwm(MOTOR_PIN, current_motor)

        # Log to file and print data
        log_row(frame_num, error, steer_val, current_motor, prop, deriv, enc_interval)
        print(
            f"[{frame_num}] Err:{error} Steer:{steer_val:.2f} "
            f"Motor:{current_motor:.2f} EncRate:{1000 // enc_interval if enc_interval != 0 else None} Blue:{blue_px}"
        )

        cv2.waitKey(1)
        time.sleep(0.02)

except KeyboardInterrupt:
    print("\nStopped by user via Keyboard Interrupt")

finally:
    # ==============================================================================
    # CLEANUP BLOCK
    # this block ensures the hardware does not run away uncontrollably
    # ==============================================================================
    set_pwm(MOTOR_PIN, MOTOR_STOP)
    set_pwm(STEER_PIN, STEER_CENTER)
    time.sleep(0.5)
    
    # Stop the PWM
    lgpio.tx_pwm(gpio_handle, MOTOR_PIN, 0, 0)
    lgpio.tx_pwm(gpio_handle, STEER_PIN, 0, 0)
    
    # Release gpio
    lgpio.gpiochip_close(gpio_handle)
    cap.release()
    print(f"Cleanup done. {frame_num} frames logged to CSV.")

encoder_driver.c

C/C++
// ========================================================================
// This file is modified from ELEC 553 Project 2
// ========================================================================

#include <linux/module.h>
#include <linux/of_device.h>
#include <linux/kernel.h>
#include <linux/gpio/consumer.h>
#include <linux/platform_device.h>
#include <linux/interrupt.h>   
#include <linux/ktime.h>       
#include <linux/err.h>         
#include <linux/mod_devicetable.h> 
#include <linux/moduleparam.h> // REQUIRED for module parameters

// Device tree and GPIO variables
static struct gpio_desc *enc_desc;
static int enc_irq;
static ktime_t last_interrupt_time;

// --- MODULE PARAMETER SETUP ---
// This will appear at: /sys/module/encoder_driver/parameters/encoder_rate
static int encoder_rate = 0;

// 0444 makes it Read-Only for everyone in sysfs
module_param(encoder_rate, int, 0444);
MODULE_PARM_DESC(encoder_rate, "Current rate of the encoder in ticks per second");

/* INTERRUPT SERVICE ROUTINE */
static irqreturn_t encoder_isr(int irq, void *dev_id)
{
    ktime_t current_time = ktime_get();
    s64 delta_ns;

    // Calculate the delta of the two times
    delta_ns = ktime_to_ns(ktime_sub(current_time, last_interrupt_time));

    // Debounce Logic: Reduced to 2ms (2,000,000 ns) 
    if (delta_ns < 2000000) {
        return IRQ_HANDLED;
    }

    // Calculate Rate (ticks per second)
    // 1,000,000,000 ns = 1 second. 
    // If delta_ns > 0.5s, the wheel just started moving from a stop, so we ignore the huge delta.
    if (delta_ns > 500000000) {
        encoder_rate = 0; 
    } else {
        encoder_rate = 1000000000 / delta_ns;
    }

    last_interrupt_time = current_time;

    return IRQ_HANDLED;
}

// probe function
static int encoder_probe(struct platform_device *pdev)
{
    // Get the dev from pdev parameter
    struct device *dev = &pdev->dev;
    int ret;

    pr_info("Encoder Driver: Probe started...\n");

    // Get the encoder GPIO descriptor
    enc_desc = devm_gpiod_get(dev, "encoder", GPIOD_IN);
    if (IS_ERR(enc_desc)) {
        pr_err("Failed to acquire Encoder GPIO\n");
        return PTR_ERR(enc_desc);
    }

    // Define the irq for the GPIO
    enc_irq = gpiod_to_irq(enc_desc);
    if (enc_irq < 0) return enc_irq;

    // Register the IRQ to be triggered on falling edge
    ret = request_irq(enc_irq, encoder_isr, IRQF_TRIGGER_FALLING, "encoder_irq", NULL);
    if (ret) return ret;

    // Initialize with current time
    last_interrupt_time = ktime_get();
    encoder_rate = 0;

    pr_info("Encoder Driver: Probe complete. Read rate at /sys/module/encoder_driver/parameters/encoder_rate\n");
    return 0;
}

// remove function
static void encoder_remove(struct platform_device *pdev)
{
    free_irq(enc_irq, NULL);
    pr_info("Encoder Driver: Successfully removed.\n");
}

// match if with the speed,encoder device in the DTS
static struct of_device_id matchy_match[] = {
    { .compatible = "speed,encoder" },
    {},
};
MODULE_DEVICE_TABLE(of, matchy_match);

// Define platform driver
static struct platform_driver encoder_driver = {
    .probe   = encoder_probe,
    .remove  = encoder_remove,
    .driver  = {
           .name  = "encoder_driver",
           .owner = THIS_MODULE,
           .of_match_table = matchy_match,
    },
};

module_platform_driver(encoder_driver);

MODULE_DESCRIPTION("Encoder Rate Tracker");
MODULE_AUTHOR("Team Waymoox");
MODULE_LICENSE("GPL v2");
MODULE_ALIAS("platform:encoder_driver");

encoder_driver.dts

C/C++
/dts-v1/;
/plugin/;

/ {
    fragment@0 {
        // insert the overlay to "/" in the device tree
        target-path = "/";
        __overlay__ {
            encoder {
                compatible = "speed,encoder";
                // the encoder uses GPIO pin 17
                encoder-gpios = <&gpio 17 0 >;
            };
        };
    };

};

object_detect.py

Python
import cv2
import numpy as np
import time

# ==============================================================================
# ELEC 553 Extra Requirement: Object Recognition

# this script uses a MobileNet Single Shot Detector (SSD) which is significantly 
# lighter and faster than YOLO, making it ideal for edge devices like the RPi 5.
# ==============================================================================

# .prototxt: the architecture/layers of the neural network.
# .caffemodel: the pre-trained weights for the network.
prototxt = "deploy.prototxt"
model = "mobilenet_iter_73000.caffemodel"
net = cv2.dnn.readNetFromCaffe(prototxt, model)

# 20 standard classes the MobileNet SSD was trained on
CLASSES = [
    "background", "aeroplane", "bicycle", "bird", "boat",
    "bottle", "bus", "car", "cat", "chair", "cow", "diningtable",
    "dog", "horse", "motorbike", "person", "pottedplant",
    "sheep", "sofa", "train", "tvmonitor",
]

# only show these specific objects
TARGET_CLASSES = {"person", "bottle", "chair", "diningtable", "tvmonitor"}

# 
# CAMERA SETUP
# 
cap = cv2.VideoCapture(0, cv2.CAP_V4L2)
# Set resolution to 320x320 exactly as required by the final project rubric
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 320)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 320)

# 
# VIDEO RECORDING SETUP
# 
# Set up the XVID codec to record the output video for the Hackster page submission
fourcc = cv2.VideoWriter_fourcc(*"XVID")
# Output file path, codec, target FPS (10.0), and frame size (320x320)
out = cv2.VideoWriter("/home/pi/object_detect/output.avi", fourcc, 10.0, (320, 320))

# 
# TRACKING VARIABLES (For FPS estimation)
# 
frame_count = 0
last_detections = None
last_time = time.time()
fps = 0.0
fps_log = []  # Store fps values to calculate the overall average at the end

print("Starting object detection...")
print("Press Q to quit")
print(f"Recording to: /home/pi/object_detect/output.avi")

try:
    while True:
        ret, frame = cap.read()
        if not ret:
            print("Camera read failed")
            break

        frame_count += 1
        img_h, img_w = frame.shape[:2]

        # Object detection every frame:
        # Convert the frame into a binary "blob" that the neural network can read.
        # Scale factor 0.007843 (which is 1/127.5), resize to 300x300 for the model, 
        # and subtract mean value 127.5 for normalization.
        blob = cv2.dnn.blobFromImage(frame, 0.007843, (300, 300), 127.5)
        net.setInput(blob)
        
        # Forward pass: compute the actual predictions (detections)
        last_detections = net.forward()

        # Draw bounding boxes
        if last_detections is not None:
            # Loop over all detections found in this frame
            for i in range(last_detections.shape[2]):
                # Extract the confidence (probability) associated with the prediction
                confidence = last_detections[0, 0, i, 2]
                
                # Filter out weak detections by ensuring confidence > 50%
                if confidence > 0.5:
                    idx = int(last_detections[0, 0, i, 1])
                    label = CLASSES[idx]

                    # Skip drawing if the object isn't in our target list
                    if label not in TARGET_CLASSES:
                        continue

                    # The neural network returns bounding box coordinates as percentages (0.0 to 1.0)
                    # We multiply by the actual image width and height to get exact pixel locations
                    box = last_detections[0, 0, i, 3:7] * np.array(
                        [img_w, img_h, img_w, img_h]
                    )
                    startX, startY, endX, endY = box.astype("int")

                    # Draw the bounding box rectangle on the frame
                    cv2.rectangle(frame, (startX, startY), (endX, endY), (0, 255, 0), 1)
                    
                    # Draw the label and confidence percentage above the bounding box
                    cv2.putText(
                        frame,
                        f"{label}:{confidence:.2f}",
                        (startX, max(12, startY - 5)), # Ensure text doesn't go off the top of the screen
                        cv2.FONT_HERSHEY_SIMPLEX,
                        0.3,
                        (0, 255, 0),
                        1,
                    )

        # FPS calculation
        # Calculate how long it took to process the frame
        now = time.time()
        fps = 1.0 / (now - last_time)
        last_time = now
        fps_log.append(fps)

        # Overlay FPS and frame count on frame
        cv2.putText(
            frame,
            f"FPS:{fps:.1f}",
            (5, 15),
            cv2.FONT_HERSHEY_SIMPLEX,
            0.4,
            (0, 255, 255),
            1,
        )
        cv2.putText(
            frame,
            f"Frame:{frame_count}",
            (5, 30),
            cv2.FONT_HERSHEY_SIMPLEX,
            0.4,
            (0, 255, 255),
            1,
        )

        # Write frame to video file
        out.write(frame)

        # Show on laptop
        cv2.imshow("Detection - MobileNet SSD", frame)

        # Print to terminal
        if frame_count % 30 == 0:
            avg_fps = sum(fps_log[-30:]) / 30
            print(
                f"[Frame {frame_count}] FPS: {fps:.1f} Avg FPS (last 30): {avg_fps:.1f}"
            )

        # Break the loop
        if cv2.waitKey(1) & 0xFF == ord("q"):
            break

except KeyboardInterrupt:
    print("\nStopped by user")

finally:
    if fps_log:
        avg = sum(fps_log) / len(fps_log)
        mn = min(fps_log)
        mx = max(fps_log)
        print(f"\n--- FPS Summary ---")
        print(f"Total frames : {frame_count}")
        print(f"Average FPS  : {avg:.2f}")
        print(f"Min FPS      : {mn:.2f}")
        print(f"Max FPS      : {mx:.2f}")
        print(f"Video saved  : /home/pi/object_detect/output.avi")

    # release hardware and file resources
    cap.release()
    out.release()
    cv2.destroyAllWindows()
    print("Done.")

Credits

Youjia Tong
1 project • 1 follower
Beiming Zhang
1 project • 1 follower
Aniket Jadhav
1 project • 0 followers
Thanks to YOUJIA TONG, Aniket Jadhav, and Beiming Zhang.

Comments