Felix TseNafil AtiqAlexNeel Racherla
Published

The Kernel Kart

An autonomous Raspberry Pi 5-powered car that uses computer vision and hardware drivers for real-time lane keeping and image recognition

IntermediateShowcase (no instructions)27
The Kernel Kart

Things used in this project

Hardware components

Webcam, Logitech® HD Pro
Webcam, Logitech® HD Pro
×1
Raspberry Pi 5
Raspberry Pi 5
×1
USB Li Ion Battery Charger
Adafruit USB Li Ion Battery Charger
×1
optical speed encoder
×1
RC car
×1

Software apps and online services

OpenCV
OpenCV

Story

Read more

Code

object_detection.py

Python
object detection code in python using yolo library for computer vision
# ELEC 553 Final Project - Object Recognition Demo
# Standalone YOLO inference on webcam frames, with FPS reporting and video output.


import csv
import time
import warnings

import cv2
from ultralytics import YOLO

warnings.filterwarnings("ignore")

# ----------------------------
# Configuration
# ----------------------------
CAM_INDEX = 0
FRAME_WIDTH = 320
FRAME_HEIGHT = 320
INFERENCE_SIZE = 320

MODEL_PATH = "yolov5su.pt"
OUTPUT_VIDEO = "yolo_output.mp4"
FPS_LOG_CSV = "yolo_fps_log.csv"

CONF_THRESHOLD = 0.25
FPS_PRINT_EVERY = 10
FPS_WINDOW = 30


"""
    Function to draw detections, given a
    results list from the model.

    Returns the number of detections that
    are above the confidence threshold.
"""


def draw_detections(frame, results, class_names):
    detection_count = 0

    # go through each potential detection
    for box in results[0].boxes:
        # get the confidence
        confidence = float(box.conf[0])

        # if we're not confident enough about this
        # detection, look through the next.
        if confidence < CONF_THRESHOLD:
            continue

        # get the bounding box of the detection
        x1, y1, x2, y2 = map(int, box.xyxy[0])
        # get the object name associated with this detection.
        class_id = int(box.cls[0])
        class_name = class_names[class_id]

        # now display the name and its confidence
        # and increment the detections
        label = f"{class_name} {confidence:.2f}"
        detection_count += 1

        # create a rectangle and add text for this detection
        cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
        # add the label for what the object is, and our confidence.
        cv2.putText(
            frame,
            label,
            (x1, max(20, y1 - 8)),
            cv2.FONT_HERSHEY_SIMPLEX,
            0.45,
            (0, 255, 0),
            2,
        )

    # now return the amount of confident
    # detections for this current frame.
    return detection_count


def main():
    print("Loading YOLO model...")
    model = YOLO(MODEL_PATH)
    print(f"Model loaded: {MODEL_PATH}")

    # open the capture.
    cap = cv2.VideoCapture(CAM_INDEX)
    cap.set(cv2.CAP_PROP_FRAME_WIDTH, FRAME_WIDTH)
    cap.set(cv2.CAP_PROP_FRAME_HEIGHT, FRAME_HEIGHT)

    if not cap.isOpened():
        print("ERROR: could not open webcam.")
        return

    # set up the video feed and writer.
    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    writer = cv2.VideoWriter(
        OUTPUT_VIDEO,
        fourcc,
        10.0,
        (FRAME_WIDTH, FRAME_HEIGHT),
    )

    # initialize frame_times,
    # a graph of fps times (instant_fps).
    frame_times = []
    frame_count = 0
    start_time = time.perf_counter()
    previous_time = start_time

    # open the csv log!
    with open(FPS_LOG_CSV, "w", newline="") as csv_file:
        # set up the csv log.
        csv_writer = csv.writer(csv_file)
        csv_writer.writerow(["frame", "instant_fps", "rolling_fps", "detections"])

        print("YOLO demo running.")
        print("Press q in the video window to stop.")

        while True:
            # get the current camera feed for this frame!
            ret, frame = cap.read()

            if not ret:
                print("WARNING: failed to read frame.")
                break

            frame = cv2.resize(frame, (FRAME_WIDTH, FRAME_HEIGHT))

            # get the time needed for an inference
            # to complete.
            inference_start = time.perf_counter()
            # run the model
            results = model(frame, imgsz=INFERENCE_SIZE, verbose=False)
            # get time after
            inference_end = time.perf_counter()

            # with the model's results, display the
            # detections with rectangles.
            # and get the detection count from the
            # function, as well.
            detection_count = draw_detections(frame, results, model.names)

            # get the total frame time, consisting of
            # the time needed to infer, and display.
            now = time.perf_counter()
            total_frame_time = now - previous_time
            previous_time = now

            # calculate the fps for both
            # inference and total.
            instant_fps = 1.0 / total_frame_time if total_frame_time > 0 else 0.0
            inference_fps = (
                1.0 / (inference_end - inference_start)
                if inference_end > inference_start
                else 0.0
            )

            # add this fps to the fps graph!
            # if the graph is full, then remove the oldest entry.
            frame_times.append(total_frame_time)
            if len(frame_times) > FPS_WINDOW:
                frame_times.pop(0)

            # get the average fps of this current window of frametimes.
            rolling_dt = sum(frame_times) / len(frame_times)
            rolling_fps = 1.0 / rolling_dt if rolling_dt > 0 else 0.0

            # increment the frames we've iterated through
            # and write our current frame, current fps, average fps, and detection count
            frame_count += 1
            csv_writer.writerow(
                [
                    frame_count,
                    f"{instant_fps:.3f}",
                    f"{rolling_fps:.3f}",
                    detection_count,
                ]
            )

            # display the FPS stats
            cv2.putText(
                frame,
                f"FPS: {rolling_fps:.2f} | Inference FPS: {inference_fps:.2f}",
                (10, 22),
                cv2.FONT_HERSHEY_SIMPLEX,
                0.45,
                (0, 255, 255),
                1,
            )

            # display the number of things detected by YOLO.
            cv2.putText(
                frame,
                f"Detections: {detection_count}",
                (10, 42),
                cv2.FONT_HERSHEY_SIMPLEX,
                0.45,
                (0, 255, 255),
                1,
            )

            # display the frame.
            writer.write(frame)
            cv2.imshow("ELEC 553 YOLO Object Recognition", frame)

            # if FPS_PRINT_EVERY frames have passed
            # then print the stats in console.
            if frame_count % FPS_PRINT_EVERY == 0:
                print(
                    f"frame={frame_count} "
                    f"rolling_fps={rolling_fps:.2f} "
                    f"inference_fps={inference_fps:.2f} "
                    f"detections={detection_count}"
                )

            # exit key
            if cv2.waitKey(1) & 0xFF == ord("q"):
                break

    # get the time this ran for,
    # and the average fps for cv.
    # this average fps isn't like rolling
    # and gets the TOTAL average, rather than
    # what's currently on graph.
    total_time = time.perf_counter() - start_time
    average_fps = frame_count / total_time if total_time > 0 else 0.0

    # end the video feed for cv.
    # along with camera
    cap.release()
    writer.release()
    cv2.destroyAllWindows()

    print("YOLO demo finished.")
    print(f"Frames processed: {frame_count}")
    print(f"Total runtime: {total_time:.2f} seconds")
    print(f"Average FPS: {average_fps:.2f}")
    print(f"Saved annotated video: {OUTPUT_VIDEO}")
    print(f"Saved FPS log: {FPS_LOG_CSV}")


# cv run start.
if __name__ == "__main__":
    main()

self_drive.py

Python
self driving code for raspberry pi
# ELEC 553 Final Project
# Self-driving RC car: lane keeping, red stop box detection, and encoder-based speed control.

# References:
# raja_961, "Autonomous Lane-Keeping Car Using Raspberry Pi and OpenCV"
# https://www.instructables.com/Autonomous-Lane-Keeping-Car-Using-Raspberry-Pi-and/
# prior 424 project on Hackster: https://www.hackster.io/team-6/lane-keeping-rc-car-ebdd08


import math
import os
import time

import cv2
import matplotlib.pyplot as plt
import numpy as np
import RPi.GPIO as GPIO

# ============================================================
# GPIO / PWM setup constants
# ============================================================

# GPIO pins for ESC (drive motor) and steering servo
ESC_PIN = 18
SERVO_PIN = 19

# Both ESC and servo expect a 50 Hz PWM signal (20 ms period)
PWM_FREQ = 50

# 7.5% duty cycle is neutral for both the ESC and the servo
ZERO_SPEED = 7.5
STRAIGHT_STEER = 7.5


# ============================================================
# Tuning variables for our specific car
# ============================================================

# Speed values found through floor testing  car only moves after 7.91
BASE_SPEED = 7.91
MAX_SPEED = 7.944
MIN_SPEED = 7.9

# Lane-detection ROI  only look at the bottom 15% of the frame
ROI_RATIO = 0.85

# Step size used by the encoder feedback loop
SPEED_CHANGE = 0.001


# ============================================================
# Steering tuning (PD controller)
# ============================================================

# Steering centerpoint
BASE_TURN = STRAIGHT_STEER

# Proportional and derivative gains
KP = 0.06
KD = 0.01

# Hard limits on the servo command  ESC mechanical range is 6%-9%
STEER_MIN = 6.0
STEER_MAX = 9.0


# ============================================================
# Camera setup
# ============================================================

# Lower resolution to keep OpenCV processing fast on Pi
FRAME_WIDTH = 160
FRAME_HEIGHT = 120
CAM_IDX = 0


# ============================================================
# Encoder setup (sysfs paths exposed by the kernel driver)
# ============================================================

ENCODER_PERIOD_PATH = "/sys/module/encoder/parameters/encoder_period_ns"
ENCODER_COUNT_PATH = "/sys/module/encoder/parameters/encoder_count"

# Target encoder period band (ns between pulses)
# Smaller period = faster wheel; larger period = slower wheel
TARGET_PERIOD_MIN = 8_000_000
TARGET_PERIOD_MAX = 40_000_000

# How long since the last pulse before we treat the wheel as stopped
ENCODER_STALE_S = 0.3


# ============================================================
# Red stop box setup
# ============================================================

# Percentage of red pixels in the ROI required to count as a stop box
RED_THRESHOLD_PCT = 8.0

# How long to pause at the first stop box
STOP_PAUSE_S = 3.0

# Frames to ignore red detection after the first stop, so we don't re-trigger
SECOND_STOP_IGNORE_TICKS = 100


# ============================================================
# Loop setup
# ============================================================

# Hard cap on loop iterations so the motor can't run forever on a crash
MAX_TICKS = 4000

# Approx 50 Hz loop rate target
RUN_DELAY_S = 0.023


# ============================================================
# Module-level state
# ============================================================

# Global PWM handles set up in setup_gpio()
pwm_esc = None
pwm_serv = None

# Encoder freshness tracking
last_encoder_count = 0
last_encoder_time = 0.0

# Stored encoder values for debug / inspection
global_enc_vals = []


# ============================================================
# Motor helpers
# ============================================================


def setup_gpio():
    # Initialize GPIO and start both PWM channels at neutral.
    global pwm_esc, pwm_serv

    GPIO.setmode(GPIO.BCM)
    GPIO.setup(ESC_PIN, GPIO.OUT)
    GPIO.setup(SERVO_PIN, GPIO.OUT)

    # Create PWM channels at 50 Hz on both pins
    pwm_esc = GPIO.PWM(ESC_PIN, PWM_FREQ)
    pwm_serv = GPIO.PWM(SERVO_PIN, PWM_FREQ)

    # Start both at neutral so the ESC arms cleanly
    pwm_esc.start(ZERO_SPEED)
    pwm_serv.start(STRAIGHT_STEER)

    # Give the ESC a couple seconds to recognize neutral and arm
    time.sleep(2)
    print("GPIO/PWM initialized.")


def reset_car():
    # Set the speed motor to neutral and the steering straight.
    if pwm_esc is not None:
        pwm_esc.ChangeDutyCycle(ZERO_SPEED)
    if pwm_serv is not None:
        pwm_serv.ChangeDutyCycle(STRAIGHT_STEER)
    print("Car stopped and steering straight.")


def start_car():
    # Bring the car up to base speed once the user has confirmed the start.
    print("Car initialized: starting drive.")
    pwm_serv.ChangeDutyCycle(STRAIGHT_STEER)
    pwm_esc.ChangeDutyCycle(BASE_SPEED)


def cleanup_gpio():
    # Always called on exit so the motor doesn't keep driving.
    global pwm_esc, pwm_serv

    try:
        # First put outputs back to neutral
        reset_car()
        time.sleep(0.3)

        # Then stop both PWM channels
        if pwm_esc is not None:
            pwm_esc.stop()
        if pwm_serv is not None:
            pwm_serv.stop()
    except Exception as e:
        print("cleanup warning:", e)

    # Drop references so __del__ doesn't fire on already-cleaned objects
    pwm_esc = None
    pwm_serv = None
    GPIO.cleanup()
    print("GPIO cleaned up.")


def clamp(value, low, high):
    # Keep a value inside a safe range.
    return max(low, min(high, value))


# ============================================================
# Encoder speed control
# ============================================================


def read_encoder_period_ns():
    # Read latest pulse-to-pulse period (ns) from the kernel driver.
    # Returns None if the wheel has stopped (count hasn't changed recently).
    global last_encoder_count, last_encoder_time

    now = time.time()

    # Driver might not be loaded  bail out quietly
    if not (os.path.exists(ENCODER_PERIOD_PATH) and os.path.exists(ENCODER_COUNT_PATH)):
        return None

    # Read both sysfs files (period and pulse count)
    try:
        with open(ENCODER_PERIOD_PATH, "r") as f:
            period = int(f.read().strip())
        with open(ENCODER_COUNT_PATH, "r") as f:
            count = int(f.read().strip())
    except (OSError, ValueError):
        return None

    # If pulse count has gone up, the wheel is still spinning
    if count != last_encoder_count:
        last_encoder_count = count
        last_encoder_time = now

    # If we haven't seen a fresh pulse in a while, treat as stopped
    if now - last_encoder_time > ENCODER_STALE_S:
        return None

    return period if period > 0 else None


def manage_speed(curr_speed):
    # Bang-bang controller: nudge throttle up or down based on encoder period.
    period = read_encoder_period_ns()

    if period is None:
        # No encoder reading  nudge forward assumed stopped or stalled
        reason = "NO_ENCODER"
        new_speed = curr_speed + 0.0005
    elif period < TARGET_PERIOD_MIN:
        # Wheel spinning too fast  back off
        reason = "TOO_FAST"
        new_speed = curr_speed - SPEED_CHANGE
    elif period > TARGET_PERIOD_MAX:
        # Wheel spinning too slow  push harder
        reason = "TOO_SLOW"
        new_speed = curr_speed + SPEED_CHANGE
    else:
        # Within target band  leave throttle alone
        reason = "GOOD"
        new_speed = curr_speed

    # Clamp to safe range no matter what the controller wants
    new_speed = clamp(new_speed, MIN_SPEED, MAX_SPEED)
    global_enc_vals.append(period if period is not None else 0)

    print(
        f"speed_ctrl period={period} reason={reason} speed={curr_speed:.3f}->{new_speed:.3f}"
    )
    return new_speed


# ============================================================
# Vision: lane detection
# ============================================================


def detect_edges(frame):
    # Detect blue lane tape using HSV thresholding then Canny edges.
    hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)

    # Looser saturation/value mins so shadowed blue tape still gets caught
    lower_blue = np.array([90, 30, 30], dtype="uint8")
    upper_blue = np.array([130, 255, 255], dtype="uint8")

    # Build the blue mask, then run edge detection on it
    mask = cv2.inRange(hsv, lower_blue, upper_blue)
    edges = cv2.Canny(mask, 50, 100)
    return edges


def region_of_interest(edges, roi_ratio=ROI_RATIO):
    # Mask out everything but the bottom slice of the frame.
    h, w = edges.shape
    mask = np.zeros_like(edges)

    # Polygon covers the lower (1 - roi_ratio) of the frame
    top_y = int(h * roi_ratio)
    polygon = np.array(
        [
            [
                (0, h),
                (0, top_y),
                (w, top_y),
                (w, h),
            ]
        ],
        np.int32,
    )

    cv2.fillPoly(mask, polygon, 255)
    return cv2.bitwise_and(edges, mask)


def detect_line_segments(cropped_edges):
    # Probabilistic Hough transform  returns a list of [x1,y1,x2,y2] line segments.
    return cv2.HoughLinesP(
        cropped_edges,
        rho=1,
        theta=np.pi / 180,
        threshold=10,
        lines=np.array([]),
        minLineLength=5,
        maxLineGap=150,
    )


def average_slope_intercept(frame, line_segments):
    # Group line segments into left/right based on slope, then average each group.
    lane_lines = []
    if line_segments is None:
        return lane_lines

    height, width, _ = frame.shape
    left_fit = []
    right_fit = []

    # Only accept segments in the outer third of their respective side
    boundary = 1 / 3
    left_region_boundary = width * (1 - boundary)
    right_region_boundary = width * boundary

    # Iterate over every detected segment
    for line_segment in line_segments:
        for x1, y1, x2, y2 in line_segment:
            # Skip vertical lines (would divide by zero)
            if x1 == x2:
                continue

            # Compute slope and intercept for this segment
            slope = (y2 - y1) / (x2 - x1)
            intercept = y1 - (slope * x1)

            # Negative slope = left lane (in image coords with y growing down)
            if slope < 0:
                if x1 < left_region_boundary and x2 < left_region_boundary:
                    left_fit.append((slope, intercept))
            else:
                # Positive slope = right lane
                if x1 > right_region_boundary and x2 > right_region_boundary:
                    right_fit.append((slope, intercept))

    # Average each side's segments into a single representative line
    if len(left_fit) > 0:
        lane_lines.append(make_points(frame, np.average(left_fit, axis=0)))
    if len(right_fit) > 0:
        lane_lines.append(make_points(frame, np.average(right_fit, axis=0)))

    return lane_lines


def make_points(frame, line):
    # Convert a (slope, intercept) pair into two endpoint coords for drawing.
    height, width, _ = frame.shape
    slope, intercept = line

    # Bottom of the frame down to halfway up
    y1 = height
    y2 = int(y1 / 2)

    # Avoid division by zero on near-horizontal lines
    if abs(slope) < 0.1:
        slope = 0.1 if slope >= 0 else -0.1

    # Solve for x given y at each end
    x1 = int((y1 - intercept) / slope)
    x2 = int((y2 - intercept) / slope)

    return [[x1, y1, x2, y2]]


def display_lines(frame, lines, line_color=(0, 255, 0), line_width=6):
    # Draw averaged lane lines onto a copy of the frame.
    line_image = np.zeros_like(frame)

    if lines is not None:
        for line in lines:
            for x1, y1, x2, y2 in line:
                cv2.line(
                    line_image,
                    (int(x1), int(y1)),
                    (int(x2), int(y2)),
                    line_color,
                    line_width,
                )

    # Blend the line overlay with the original frame
    return cv2.addWeighted(frame, 0.8, line_image, 1, 1)


def display_heading_line(frame, steering_angle, line_color=(0, 0, 255), line_width=5):
    # Draw the red intended-heading line for visualization.
    heading_image = np.zeros_like(frame)
    height, width, _ = frame.shape

    # Convert the steering angle to a line going up from the bottom-center
    steering_angle_radian = steering_angle / 180.0 * math.pi
    x1 = int(width / 2)
    y1 = height

    # Guard against tan blowups when angle is exactly 90 deg
    tan_val = math.tan(steering_angle_radian)
    if abs(tan_val) < 1e-3:
        x2 = x1
    else:
        x2 = int(x1 - height / 2 / tan_val)
    y2 = int(height / 2)

    cv2.line(heading_image, (x1, y1), (x2, y2), line_color, line_width)
    return cv2.addWeighted(frame, 0.8, heading_image, 1, 1)


def get_steering_angle(frame, lane_lines):
    # Convert detected lane line positions into a steering angle around 90 deg.
    height, width, _ = frame.shape
    mid = int(width / 2)

    # Both lanes detected  steer toward the average of their top points
    if len(lane_lines) >= 2:
        _, _, left_x2, _ = lane_lines[0][0]
        _, _, right_x2, _ = lane_lines[1][0]
        x_offset = (left_x2 + right_x2) / 2 - mid
        y_offset = int(height * 0.3)

    # Only one lane  use the line's slope direction directly
    elif len(lane_lines) == 1:
        x1, _, x2, _ = lane_lines[0][0]
        x_offset = x2 - x1
        y_offset = int(height * 0.5)

    # No lanes  go straight
    else:
        x_offset = 0
        y_offset = int(height * 0.75)

    # Convert offset into degrees off-straight, then shift to 90-centered
    angle_to_mid_radian = math.atan(x_offset / y_offset)
    angle_to_mid_deg = int(angle_to_mid_radian * 180.0 / math.pi)
    steering_angle = angle_to_mid_deg + 90

    return steering_angle


# ============================================================
# Vision: red stop box detection
# ============================================================


def isRedFloorVisible(image):
    # Return (is_stop, mask, percentage) for red detection in the lower frame.
    h, w = image.shape[:2]

    # Only look at the bottom 30% of the frame  stop paper is on the ground
    lower_img = image[int(h * 0.7) :, :]
    hsv_img = cv2.cvtColor(lower_img, cv2.COLOR_BGR2HSV)

    # Red wraps around the hue circle, so we need two ranges
    lower_red1 = np.array([0, 40, 60], dtype="uint8")
    upper_red1 = np.array([10, 255, 255], dtype="uint8")
    lower_red2 = np.array([170, 40, 60], dtype="uint8")
    upper_red2 = np.array([180, 255, 255], dtype="uint8")

    # Build the two masks and combine them
    mask1 = cv2.inRange(hsv_img, lower_red1, upper_red1)
    mask2 = cv2.inRange(hsv_img, lower_red2, upper_red2)
    mask = cv2.bitwise_or(mask1, mask2)

    # Compute the percentage of red pixels and check the threshold
    percentage_detected = np.count_nonzero(mask) * 100.0 / mask.size
    result = percentage_detected > RED_THRESHOLD_PCT

    if result:
        print(f"Red detected: {percentage_detected:.1f}%")

    return result, mask, percentage_detected


# ============================================================
# Plot helpers
# ============================================================


def plot_pd(p_vals, d_vals, error, show_img=False):
    # Required plot 1: P response, D response, and error vs frame number.
    plt.figure()
    t_ax = np.arange(len(p_vals))
    plt.plot(t_ax, p_vals, "-", label="Proportional response")
    plt.plot(t_ax, d_vals, "-", label="Derivative response")
    plt.plot(t_ax, error, "--", label="Error")
    plt.xlabel("Frames")
    plt.legend()
    plt.title("PD and Error Value with Time")
    plt.savefig("PDError.png")

    if show_img:
        plt.show()
    plt.clf()


def plot_pwm(speed_pwms, turn_pwms, error, show_img=False):
    # Required plot 2: speed duty, steering duty, and error vs frame number.
    plt.figure()
    t_ax = np.arange(len(speed_pwms))
    plt.plot(t_ax, speed_pwms, "-", label="Speed duty cycle")
    plt.plot(t_ax, turn_pwms, "-", label="Steering duty cycle")
    plt.plot(t_ax, error, "--", label="Error")
    plt.xlabel("Frames")
    plt.legend()
    plt.title("Speed, Steering, and Error with Time")
    plt.savefig("SpeedSteeringError.png")

    if show_img:
        plt.show()
    plt.clf()


def save_csv(frames, errors, speeds, steers, p_vals, d_vals):
    # Save raw values so plots can be regenerated later if needed.
    if len(frames) == 0:
        return

    # Pack columns and write a header for clarity
    data = np.column_stack([frames, errors, speeds, steers, p_vals, d_vals])
    np.savetxt(
        "run_log.csv",
        data,
        delimiter=",",
        header="frame,error,speed_duty,steering_duty,p_response,d_response",
        comments="",
    )


# ============================================================
# Frame processing used in both preview and driving loops
# ============================================================


def process_frame(original_frame):
    # Resize once and run the full vision pipeline on a single frame.
    frame = cv2.resize(original_frame, (FRAME_WIDTH, FRAME_HEIGHT))

    # Lane detection pipeline: edges -> ROI -> Hough -> averaging
    edges = detect_edges(frame)
    roi = region_of_interest(edges, roi_ratio=ROI_RATIO)
    line_segments = detect_line_segments(roi)
    lane_lines = average_slope_intercept(frame, line_segments)
    lane_lines_image = display_lines(frame, lane_lines)

    # Compute steering angle and overlay the heading line
    steering_angle = get_steering_angle(frame, lane_lines)
    heading_image = display_heading_line(lane_lines_image, steering_angle)

    # Red stop box check on the same frame
    is_stop, red_mask, red_pct = isRedFloorVisible(frame)

    # Error in degrees from straight (90)
    error = steering_angle - 90

    return (
        frame,
        edges,
        lane_lines,
        steering_angle,
        heading_image,
        is_stop,
        red_pct,
        error,
    )


# ============================================================
# Main program
# ============================================================


def main():
    # Local state for the run
    last_time = time.time()
    last_error = 0
    second_stop_tick = 0
    passed_first_stop_sign = False

    # Buffers for the required plots and the CSV log
    frames = []
    p_vals = []
    d_vals = []
    err_vals = []
    speed_vals = []
    steer_vals = []

    # Open the camera
    video = cv2.VideoCapture(CAM_IDX, cv2.CAP_V4L2)
    video.set(cv2.CAP_PROP_FRAME_WIDTH, FRAME_WIDTH)
    video.set(cv2.CAP_PROP_FRAME_HEIGHT, FRAME_HEIGHT)

    if not video.isOpened():
        print("ERROR: camera could not open")
        return

    # ------------------------------------------------------------
    # Preview loop: hold motor at neutral, show camera feed,
    # wait for user to press 's' to start driving.
    # ------------------------------------------------------------
    print("Preview mode. Press 's' in the OpenCV window to start.")
    print("Press 'q' in the OpenCV window to quit.")
    reset_car()

    preview_counter = 0
    while True:
        # Grab a frame and process it (no motor commands here)
        ret, original_frame = video.read()
        if not ret:
            print("No frame in preview")
            continue

        _, edges, lane_lines, steering_angle, heading_image, _, red_pct, error = (
            process_frame(original_frame)
        )
        preview_counter += 1

        # Overlay debug info on the frame
        cv2.putText(
            heading_image,
            f"PREVIEW err={error:+d} angle={steering_angle} lanes={len(lane_lines)} red={red_pct:.1f}% | s=start q=quit",
            (5, 15),
            cv2.FONT_HERSHEY_SIMPLEX,
            0.35,
            (255, 255, 255),
            1,
        )

        # Show both the heading image and the raw edges
        cv2.imshow("self_drive", heading_image)
        cv2.imshow("edges", edges)

        # Periodic terminal log so we can see the controller is alive
        if preview_counter % 10 == 0:
            print(
                f"PREVIEW f={preview_counter} err={error:+d} angle={steering_angle} lanes={len(lane_lines)} red={red_pct:.1f}%"
            )

        # Check for keypress to start or quit
        key = cv2.waitKey(1) & 0xFF
        if key == ord("s"):
            print("Starting car.")
            break
        if key == ord("q"):
            print("Quit before start.")
            return

    # ------------------------------------------------------------
    # Driving loop: car runs the track until done or MAX_TICKS hits
    # ------------------------------------------------------------
    curr_speed = BASE_SPEED
    counter = 0
    start_car()
    last_time = time.time()

    while counter < MAX_TICKS:
        # Grab and process the next frame
        ret, original_frame = video.read()
        if not ret:
            print("No frame while driving")
            continue

        _, edges, lane_lines, steering_angle, heading_image, is_stop, red_pct, error = (
            process_frame(original_frame)
        )

        # --- PD steering control ---
        now = time.time()
        dt = now - last_time
        last_time = now
        if dt <= 0:
            dt = 1e-3

        # Compute P and D contributions
        proportional = KP * error
        derivative = KD * (error - last_error) / dt
        last_error = error

        # Combine and clamp to mechanical range
        turn_amt = BASE_TURN + proportional + derivative
        turn_amt = clamp(turn_amt, STEER_MIN, STEER_MAX)
        pwm_serv.ChangeDutyCycle(turn_amt)

        # Append values for the required plots
        frames.append(counter)
        p_vals.append(proportional)
        d_vals.append(derivative)
        err_vals.append(error)
        speed_vals.append(curr_speed)
        steer_vals.append(turn_amt)

        # --- Stop box state machine (checked every 10 ticks) ---
        if counter % 10 == 0:
            if not passed_first_stop_sign:
                # Looking for the first stop box
                if is_stop:
                    print("FIRST STOP BOX detected. Pausing.")
                    pwm_esc.ChangeDutyCycle(ZERO_SPEED)
                    time.sleep(STOP_PAUSE_S)
                    passed_first_stop_sign = True
                    second_stop_tick = counter
                    last_time = time.time()
            elif counter > second_stop_tick + SECOND_STOP_IGNORE_TICKS:
                # Cooldown over  looking for the second/final stop box
                if is_stop:
                    print("SECOND STOP BOX detected. Final stop.")
                    pwm_esc.ChangeDutyCycle(ZERO_SPEED)
                    break

        # --- Encoder-based speed control (every 3 ticks) ---
        if counter % 3 == 0:
            curr_speed = manage_speed(curr_speed)

        # In sharp turns, drop to minimum throttle so the servo has more authority
        if abs(error) > 15:
            applied_speed = MIN_SPEED
        else:
            applied_speed = curr_speed

        # Apply the speed to the ESC
        pwm_esc.ChangeDutyCycle(applied_speed)

        # --- Debug overlays ---
        cv2.putText(
            heading_image,
            f"DRIVE err={error:+d} steer={turn_amt:.2f} speed={curr_speed:.3f} red={red_pct:.1f}%",
            (5, 15),
            cv2.FONT_HERSHEY_SIMPLEX,
            0.35,
            (255, 255, 255),
            1,
        )

        cv2.imshow("self_drive", heading_image)
        cv2.imshow("edges", edges)

        # Terminal print every 5 frames so the operator can see what's happening
        if counter % 5 == 0:
            period = read_encoder_period_ns()
            print(
                f"DRIVE f={counter} err={error:+d} P={proportional:+.2f} "
                f"D={derivative:+.2f} steer={turn_amt:.2f} speed={curr_speed:.3f} "
                f"lanes={len(lane_lines)} red={red_pct:.1f}% period={period}"
            )

        # Allow a manual quit via 'q' or Escape
        key = cv2.waitKey(1) & 0xFF
        if key == ord("q") or key == 27:
            print("User quit.")
            break

        counter += 1
        time.sleep(RUN_DELAY_S)

    # ------------------------------------------------------------
    # Shutdown
    # ------------------------------------------------------------
    reset_car()
    video.release()
    cv2.destroyAllWindows()

    # Write out plots and raw CSV log
    plot_pd(p_vals, d_vals, err_vals)
    plot_pwm(speed_vals, steer_vals, err_vals)
    save_csv(frames, err_vals, speed_vals, steer_vals, p_vals, d_vals)
    print("Saved PDError.png, SpeedSteeringError.png, and run_log.csv")


# ============================================================
# Entry point  wrap main in try/finally so cleanup always runs
# ============================================================

if __name__ == "__main__":
    try:
        setup_gpio()
        main()
    finally:
        cleanup_gpio()

encoder.c

C/C++
encoder device driver for raspberry pi
// Optical speed encoder kernel driver
// ELEC 553 Final Project

// Reference: prior 424 project on Hackster
// https://www.hackster.io/team-6/lane-keeping-rc-car-ebdd08

#include <linux/module.h>
#include <linux/of_device.h>
#include <linux/kernel.h>
#include <linux/gpio/consumer.h>
#include <linux/platform_device.h>
#include <linux/interrupt.h>
#include <linux/timekeeping.h>
#include <linux/ktime.h>
#include <linux/moduleparam.h>
#include <linux/of.h>


// irq number for encoder interrupt
static int irq_number;

// gpio descriptor for encoder OUT pin
static struct gpio_desc *my_enc;

// timestamp of last accepted pulse (ns)
static u64 prev_time_ns;

// most recent pulse-to-pulse period (ns), exposed via sysfs
static unsigned long encoder_period_ns;
module_param(encoder_period_ns, ulong, 0444);
MODULE_PARM_DESC(encoder_period_ns, "Latest encoder pulse-to-pulse period in ns");

// total accepted pulses, exposed via sysfs (lets userspace detect a stopped wheel)
static unsigned long encoder_count;
module_param(encoder_count, ulong, 0444);
MODULE_PARM_DESC(encoder_count, "Number of accepted encoder pulses");

// minimum interval between accepted pulses (debounce threshold)
static unsigned long debounce_ns = 1000000; // 1 ms
module_param(debounce_ns, ulong, 0644);
MODULE_PARM_DESC(debounce_ns, "Minimum valid time between pulses in ns");


// interrupt service routine  fires on rising edge of encoder pin
static irqreturn_t gpio_irq_handler(int irq, void *dev_id)
{
	u64 curr_time_ns;
	u64 elapsed_ns;
	int pin_val;

	// read current pin state and current time
	pin_val = gpiod_get_value(my_enc);
	curr_time_ns = ktime_get_ns();

	// time since previous accepted pulse
	elapsed_ns = curr_time_ns - prev_time_ns;

	// debounce: ignore pulses too close to previous valid one
	if (pin_val == 1 && elapsed_ns > debounce_ns) {
		// accept this pulse: update timestamp, period, and count
		prev_time_ns = curr_time_ns;
		encoder_period_ns = (unsigned long)elapsed_ns;
		encoder_count++;
	}

	return IRQ_HANDLED;
}


// probe function  called when DTS compatible string matches
static int enc_probe(struct platform_device *pdev)
{
	struct device *dev = &pdev->dev;
	int ret;

	printk(KERN_INFO "enc_probe - RUNNING\n");

	// get encoder gpio from device tree (DTS property "encoder-gpios")
	my_enc = devm_gpiod_get(dev, "encoder", GPIOD_IN);
	// error check
	if (IS_ERR(my_enc)) {
		printk(KERN_ERR "enc_probe - could not get encoder GPIO\n");
		return PTR_ERR(my_enc);
	}

	// map encoder gpio to irq num
	irq_number = gpiod_to_irq(my_enc);
	// error check
	if (irq_number < 0) {
		printk(KERN_ERR "enc_probe - could not map GPIO to IRQ\n");
		return irq_number;
	}

	// reset
	prev_time_ns = ktime_get_ns();
	encoder_period_ns = 0;
	encoder_count = 0;

	// register isr on rising edge
	ret = request_irq(
		irq_number,
		gpio_irq_handler,
		IRQF_TRIGGER_RISING,
		"speed_encoder_irq",
		pdev
	);
	// error check
	if (ret != 0) {
		printk(KERN_ERR "Error! Cannot request interrupt number: %d\n", irq_number);
		return ret;
	}

	printk(KERN_INFO "Speed Encoder Insert Successful. IRQ=%d\n", irq_number);
	return 0;
}


// remove function  called when module unloads
static void enc_remove(struct platform_device *pdev)
{
	// free the irq and print a message
	free_irq(irq_number, pdev);
	printk(KERN_INFO "Speed Encoder Removed.\n");
}


// match table  must match compatible string in encoder.dts
static struct of_device_id matchy_match[] = {
	{ .compatible = "custom,speed_encoder" },
	{ /* leave alone - keep this here (end node) */ }
};
MODULE_DEVICE_TABLE(of, matchy_match);


// platform driver object
static struct platform_driver speed_encoder = {
	.probe  = enc_probe,
	.remove = enc_remove,
	.driver = {
		.name           = "speed_encoder",
		.owner          = THIS_MODULE,
		.of_match_table = matchy_match,
	},
};

module_platform_driver(speed_encoder);

MODULE_DESCRIPTION("ELEC 553 Final Project - Optical Speed Encoder Driver");
MODULE_AUTHOR("Alex Smith");
MODULE_LICENSE("GPL v2");
MODULE_ALIAS("platform:speed_encoder");

encoder.dts

C/C++
device tree source file for encoder device driver
/dts-v1/;
/plugin/;

// Device tree overlay for the optical speed encoder

/ {
    // Pi 5 SoC compatible string
    compatible = "brcm,bcm2712";

    fragment@0 {
        // root of the device tree
        target-path = "/";

        __overlay__ {
            speed_encoder {
                compatible = "custom,speed_encoder";
                // Encoder OUT pin connected to GPIO 17 (BCM)
                encoder-gpios = <&gpio 17 0>;
            };
        };
    };
};

Credits

Felix Tse
2 projects • 0 followers
Nafil Atiq
1 project • 1 follower
Alex
1 project • 1 follower
Neel Racherla
1 project • 0 followers
Hardware engineer

Comments