Team Cool

An autonomous car using OpenCV and speed encoders to do lane recognition and recognize stops

IntermediateShowcase (no instructions)8 hours18
Team Cool

Things used in this project

Hardware components

Webcam, Logitech® HD Pro
Webcam, Logitech® HD Pro
×1
Optical Speed Encoder
×1
Portable Charger
×1
Raspberry Pi 4 Model B
Raspberry Pi 4 Model B
×1
DC motor (generic)
×1
Servo motor
×1

Software apps and online services

OpenCV
OpenCV

Hand tools and fabrication machines

Hot glue gun (generic)
Hot glue gun (generic)
Tape, Duct
Tape, Duct
Scissor, Electrician
Scissor, Electrician

Story

Read more

Code

my_overlay.dts

C/C++
Our modified device tree source
// Provided by https://www.hackster.io/m-e-g-g/the-magnificent-m-e-g-g-car-28ec89#code

/dts-v1/;
/plugin/;

/ {
    fragment@0 {
        target-path ="/";
        __overlay__ {
            my_device_jerry {
                compatible = "a_second_name";
                lights1-gpios = <&gpio 5 0 >;
		lights2-gpios = <&gpio 6 0 >;
            };
        };
    };

};

car.py

Python
# Team:Ayush Suresh, Daniel Merklen Pimenta De Medeiros, Hong-Jyun Wang, Kaan Yilmaz, Leo Marek
# Class: ELEC 424
# Final Project
# Fall 2025
#
# Code drawn from:
# https://www.hackster.io/m-e-g-g/the-magnificent-m-e-g-g-car-28ec89#code

import math
import time

import cv2
import matplotlib.pyplot as plt
import numpy as np
import RPi.GPIO as GPIO

# =============================================================================
# CONFIGURATION
# =============================================================================

# --- GPIO pins ---
PIN_ESC = 18  # Drive motor (ESC)
PIN_SERVO = 19  # Steering servo
PWM_FREQ_HZ = 50

# --- ESC / Servo duty cycles (%) ---
ESC_NEUTRAL_DUTY_CYCLE = 7.3
ESC_LOW_DUTY_CYCLE = 7.6
SERVO_NEUTRAL_DUTY_CYCLE = 6.5
SERVO_MIN_DUTY_CYCLE = 4.0
SERVO_MAX_DUTY_CYCLE = 9.0

# --- ESC calibration duty cycles ---
ESC_CAL_FULL_THROTTLE = 2.0  # Full throttle signal during calibration
ESC_CAL_FULL_REVERSE = 1.0  # Full reverse signal during calibration
ESC_CAL_THROTTLE_WAIT = 2  # Seconds to hold throttle signal
ESC_CAL_REVERSE_WAIT = 2  # Seconds to hold reverse signal
ESC_CAL_NEUTRAL_WAIT = 1  # Seconds to hold neutral after calibration

# --- Camera ---
CAM_INDEX = 0
FRAME_WIDTH = 160
FRAME_HEIGHT = 120

# --- PD controller ---
KP = 0.099
KD = KP * 0.42
STEERING_STRAIGHT_DEG = 90
STEERING_MAX_ERROR_DEG = 28
STEERING_ERROR_DEADBAND_DEG = 2
STEERING_SMOOTHING_ALPHA = 0.24
STEERING_MAX_D_TERM = 0.4
STEERING_MAX_STEP_DUTY = 0.14
MIN_CONTROL_DT_SEC = 0.02

# --- Lane detection: blue HSV thresholds ---
LANE_BLUE_HSV_LOWER = np.array([90, 50, 50], dtype="uint8")
LANE_BLUE_HSV_UPPER = np.array([130, 255, 255], dtype="uint8")
BLUE_BLUR_KERNEL_SIZE = 5
MASK_MORPH_KERNEL_SIZE = 3

# --- Lane detection: Canny edge thresholds ---
CANNY_LOW_THRESHOLD = 50
CANNY_HIGH_THRESHOLD = 100

# --- Lane detection: region of interest ---
ROI_VERTICAL_FRACTION = 0.5  # Keep the bottom fraction of the frame

# --- Lane detection: Hough transform ---
HOUGH_RHO = 1
HOUGH_THETA = np.pi / 180
HOUGH_THRESHOLD = 5
HOUGH_MIN_LINE_LEN = 3
HOUGH_MAX_LINE_GAP = 50
MIN_SEGMENT_LENGTH = 12
MIN_SEGMENT_SLOPE = 0.4
LANE_X_OFFSET_DEADBAND_PX = 5
LANE_X_OFFSET_BLEND_ALPHA = 0.35

# --- Lane detection: left/right region split ---
LANE_BOUNDARY_FRACTION = 1 / 4  # Inner boundary as fraction of frame width

# --- Lane detection: degenerate slope guard ---
MIN_SLOPE = 0.1  # Replaces a zero slope to avoid division by zero

# --- Display: lane lines ---
LANE_LINE_COLOR = (0, 255, 0)  # Green (BGR)
LANE_LINE_WIDTH = 6

# --- Display: heading line ---
HEADING_LINE_COLOR = (0, 0, 255)  # Red (BGR)
HEADING_LINE_WIDTH = 5

# --- Display: overlay blend weight ---
FRAME_OVERLAY_ALPHA = 0.8  # Weight of original frame in addWeighted blends

# --- Stop sign (red floor) detection ---
RED_HSV_LOWER_1 = np.array([0, 40, 60], dtype="uint8")
RED_HSV_UPPER_1 = np.array([10, 255, 255], dtype="uint8")
RED_HSV_LOWER_2 = np.array([170, 40, 60], dtype="uint8")
RED_HSV_UPPER_2 = np.array([180, 255, 255], dtype="uint8")
RED_FLOOR_PCT_THRESHOLD = 18  # Minimum % of stop ROI that must be red
STOP_ROI_TOP_FRACTION = 0.25  # Start stop-marker detection one quarter down the frame
STOP_ROI_BOTTOM_FRACTION = (
    0.75  # End stop-marker detection three quarters down the frame
)

# --- Main loop timing and intervals ---
MAX_TICKS = 4000  # Maximum loop iterations before forced stop
STOP_CHECK_INTERVAL = 1  # Check for stop sign every tick
SECOND_STOP_TICK_BUFFER = 100  # Ticks after first stop before watching for second
FIRST_STOP_WAIT_SEC = 3.0  # Pause duration at first stop sign (seconds)
FINAL_STOP_CONFIRM_FRAMES = (
    2  # Require this many positive stop frames before final stop
)
STARTUP_WAIT_SEC = 3.0  # Wait after calibration before driving (seconds)
LOOP_DRIVE_WAIT_SEC = 0.023  # Brief drive window each tick (seconds)
SPEED_STEP_DUTY = 0.01  # Ramp low speed in gradually after stops

# --- Debug: print every N ticks to avoid terminal flood ---
DEBUG_PRINT_INTERVAL = 10

# =============================================================================
# HARDWARE SETUP
# =============================================================================

GPIO.setmode(GPIO.BCM)
GPIO.setup(PIN_ESC, GPIO.OUT)
GPIO.setup(PIN_SERVO, GPIO.OUT)

pwm_esc = GPIO.PWM(PIN_ESC, PWM_FREQ_HZ)
pwm_serv = GPIO.PWM(PIN_SERVO, PWM_FREQ_HZ)

pwm_esc.start(ESC_NEUTRAL_DUTY_CYCLE)
pwm_serv.start(SERVO_NEUTRAL_DUTY_CYCLE)

# =============================================================================
# CAR CONTROL
# =============================================================================


def reset_car():
    """Stop the car and center the steering."""
    pwm_esc.ChangeDutyCycle(ESC_NEUTRAL_DUTY_CYCLE)
    pwm_serv.ChangeDutyCycle(SERVO_NEUTRAL_DUTY_CYCLE)


def start_car():
    """Set the car to base speed with neutral steering."""
    print("Car ready")
    pwm_esc.ChangeDutyCycle(ESC_LOW_DUTY_CYCLE)
    pwm_serv.ChangeDutyCycle(SERVO_NEUTRAL_DUTY_CYCLE)


def set_speed(dc):
    """Set ESC duty cycle to control drive motor speed."""
    pwm_esc.ChangeDutyCycle(dc)


def set_turn(dc):
    """Set servo duty cycle to control steering angle."""
    pwm_serv.ChangeDutyCycle(dc)


def clamp(value, low, high):
    """Clamp a numeric value between the given bounds."""
    return max(low, min(value, high))


def rate_limit(target, current, max_delta):
    """Limit how quickly a control value may change between loop iterations."""
    return current + clamp(target - current, -max_delta, max_delta)


def calibrate_esc():
    """Calibrate the ESC (only needed if lacking gray/orange/black wires)."""
    print("Calibrating ESC...")
    set_speed(ESC_NEUTRAL_DUTY_CYCLE)
    wait(2.0)
    print("Calibration done")


def wait(wait_time):
    """Busy-wait for the given number of seconds."""
    end_time = time.perf_counter() + wait_time
    while time.perf_counter() < end_time:
        pass


# =============================================================================
# VISION PIPELINE
# =============================================================================


def detect_edges(frame, debug=False):
    """Detect blue lane lines and return a Canny edge image."""
    hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
    mask = cv2.inRange(hsv, LANE_BLUE_HSV_LOWER, LANE_BLUE_HSV_UPPER)
    edges = cv2.Canny(mask, CANNY_LOW_THRESHOLD, CANNY_HIGH_THRESHOLD)

    if debug:
        blue_pixels = np.count_nonzero(mask)
        edge_pixels = np.count_nonzero(edges)
        print(
            f"  [detect_edges] blue mask pixels={blue_pixels}, edge pixels={edge_pixels}"
        )

    return edges


def region_of_interest(edges, debug=False):
    """Mask out the upper portion of the edge image, keeping only the lower fraction."""
    height, width = edges.shape
    mask = np.zeros_like(edges)
    roi_top = int(height * (1 - ROI_VERTICAL_FRACTION))
    polygon = np.array(
        [
            [
                (0, height),
                (0, roi_top),
                (width, roi_top),
                (width, height),
            ]
        ],
        np.int32,
    )
    cv2.fillPoly(mask, polygon, 255)
    roi = cv2.bitwise_and(edges, mask)

    if debug:
        roi_pixels = np.count_nonzero(roi)
        print(
            f"  [region_of_interest] ROI top y={roi_top}/{height}, pixels in ROI={roi_pixels}"
        )

    return roi


def detect_line_segments(cropped_edges, debug=False):
    """Run Hough transform to detect line segments in the ROI."""
    result = cv2.HoughLinesP(
        cropped_edges,
        HOUGH_RHO,
        HOUGH_THETA,
        HOUGH_THRESHOLD,
        np.array([]),
        minLineLength=HOUGH_MIN_LINE_LEN,
        maxLineGap=HOUGH_MAX_LINE_GAP,
    )

    if debug:
        n = len(result) if result is not None else 0
        print(f"  [detect_line_segments] Hough found {n} raw segments")
        if result is not None:
            for seg in result:
                x1, y1, x2, y2 = seg[0]
                slope = (y2 - y1) / (x2 - x1) if x2 != x1 else float("inf")
                print(f"    segment ({x1},{y1})->({x2},{y2})  slope={slope:.2f}")

    return result


def average_slope_intercept(frame, line_segments, debug=False):
    lane_lines = []
    if line_segments is None:
        if debug:
            print("  [average_slope_intercept] No segments to process")
        return lane_lines

    height, width, _ = frame.shape
    left_fit, right_fit = [], []
    mid = width / 2

    for line_segment in line_segments:
        for x1, y1, x2, y2 in line_segment:
            if x1 == x2:
                continue  # skip truly vertical lines only
            slope = (y2 - y1) / (x2 - x1)
            intercept = y1 - slope * x1
            x_avg = (x1 + x2) / 2
            if x_avg < mid:
                left_fit.append((slope, intercept))
            else:
                right_fit.append((slope, intercept))

    if debug:
        print(
            f"  [average_slope_intercept] left_fit={len(left_fit)}, right_fit={len(right_fit)}"
        )

    if left_fit:
        lane_lines.append(make_points(frame, np.average(left_fit, axis=0)))
    if right_fit:
        lane_lines.append(make_points(frame, np.average(right_fit, axis=0)))

    return lane_lines


def make_points(frame, line):
    """Convert a slope/intercept pair into pixel endpoints spanning the lower half."""
    height, width, _ = frame.shape
    slope, intercept = line

    if slope == 0:
        slope = MIN_SLOPE

    y1 = height
    y2 = int(y1 / 2)
    x1 = int((y1 - intercept) / slope)
    x2 = int((y2 - intercept) / slope)

    return [[x1, y1, x2, y2]]


def display_lines(frame, lines, line_color=LANE_LINE_COLOR, line_width=LANE_LINE_WIDTH):
    """Draw detected lane lines on a copy of the frame."""
    line_image = np.zeros_like(frame)
    if lines is not None:
        for line in lines:
            for x1, y1, x2, y2 in line:
                cv2.line(line_image, (x1, y1), (x2, y2), line_color, line_width)
    return cv2.addWeighted(frame, FRAME_OVERLAY_ALPHA, line_image, 1, 1)


def display_heading_line(
    frame, steering_angle, line_color=HEADING_LINE_COLOR, line_width=HEADING_LINE_WIDTH
):
    """Draw the heading direction line on the frame."""
    heading_image = np.zeros_like(frame)
    height, width, _ = frame.shape
    angle_rad = steering_angle / 180.0 * math.pi
    x1 = int(width / 2)
    y1 = height
    x2 = int(x1 - height / 2 / math.tan(angle_rad))
    y2 = int(height / 2)
    cv2.line(heading_image, (x1, y1), (x2, y2), line_color, line_width)
    return cv2.addWeighted(frame, FRAME_OVERLAY_ALPHA, heading_image, 1, 1)


def get_steering_angle(frame, lane_lines):
    """Compute the steering angle (in degrees) from detected lane lines."""
    height, width, _ = frame.shape

    if len(lane_lines) == 2:
        _, _, left_x2, _ = lane_lines[0][0]
        _, _, right_x2, _ = lane_lines[1][0]
        x_offset = (left_x2 + right_x2) / 2 - int(width / 2)
    elif len(lane_lines) == 1:
        x1, _, x2, _ = lane_lines[0][0]
        x_offset = x2 - x1
    else:
        x_offset = 0

    y_offset = int(height / 2)
    angle_deg = int(math.atan(x_offset / y_offset) * 180.0 / math.pi)
    return angle_deg + STEERING_STRAIGHT_DEG


# =============================================================================
# STOP SIGN DETECTION
# =============================================================================


def isRedFloorVisible(image):
    """
    Detect whether a significant portion of the image contains red (stop zone).
    Adapted from hackster.io; HSV ranges adjusted for red floor segments.

    Returns:
        (bool, ndarray, float): detection result, debug feed, and red coverage percentage.
    """
    height, width, _ = image.shape
    roi_top = int(height * STOP_ROI_TOP_FRACTION)
    roi_bottom = int(height * STOP_ROI_BOTTOM_FRACTION)
    roi = image[roi_top:roi_bottom, :]
    hsv_img = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV)

    # Red wraps around 0/180 degrees in HSV, so two ranges are needed
    mask1 = cv2.inRange(hsv_img, RED_HSV_LOWER_1, RED_HSV_UPPER_1)
    mask2 = cv2.inRange(hsv_img, RED_HSV_LOWER_2, RED_HSV_UPPER_2)
    mask = cv2.bitwise_or(mask1, mask2)

    kernel = np.ones((3, 3), np.uint8)
    mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel)
    mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)

    pct_detected = np.count_nonzero(mask) * 100 / mask.size
    is_stop = pct_detected > RED_FLOOR_PCT_THRESHOLD

    annotated = image.copy()
    cv2.rectangle(
        annotated, (0, roi_top), (width - 1, roi_bottom - 1), (0, 255, 255), 1
    )

    red_overlay = roi.copy()
    red_tint = np.zeros_like(roi)
    red_tint[:, :, 2] = 255
    tinted_roi = cv2.addWeighted(roi, 0.65, red_tint, 0.35, 0)
    red_overlay[mask > 0] = tinted_roi[mask > 0]
    annotated[roi_top:roi_bottom, :] = red_overlay

    status = "STOP" if is_stop else "CLEAR"
    status_color = (0, 0, 255) if is_stop else (0, 255, 0)
    cv2.putText(
        annotated,
        f"{status} red={pct_detected:.1f}% threshold={RED_FLOOR_PCT_THRESHOLD}%",
        (5, 18),
        cv2.FONT_HERSHEY_SIMPLEX,
        0.45,
        status_color,
        1,
        cv2.LINE_AA,
    )

    mask_bgr = np.zeros_like(image)
    mask_bgr[roi_top:roi_bottom, :] = cv2.cvtColor(mask, cv2.COLOR_GRAY2BGR)

    masked_color = np.zeros_like(image)
    masked_color[roi_top:roi_bottom, :] = cv2.bitwise_and(roi, roi, mask=mask)

    debug_view = np.hstack((annotated, mask_bgr, masked_color))
    return is_stop, debug_view, pct_detected


# =============================================================================
# PLOTTING
# =============================================================================


def plot_pd(p_vals, d_vals, error, show_img=False):
    """Save a plot of P/D controller values and error over time."""
    fig, ax1 = plt.subplots()
    t_ax = np.arange(len(p_vals))
    ax1.plot(t_ax, p_vals, "-", label="P values")
    ax1.plot(t_ax, d_vals, "-", label="D values")
    ax2 = ax1.twinx()
    ax2.plot(t_ax, error, "--r", label="Error")
    ax1.set_xlabel("Frames")
    ax1.set_ylabel("PD Value")
    ax2.set_ylim(-STEERING_STRAIGHT_DEG, STEERING_STRAIGHT_DEG)
    ax2.set_ylabel("Error Value")
    plt.title("PD Values over time")
    fig.legend()
    fig.tight_layout()
    plt.savefig("pd_plot.png")
    if show_img:
        plt.show()
    plt.clf()


def plot_pwm(speed_pwms, turn_pwms, error, show_img=False):
    """Save a plot of speed, steering duty cycles, and error over time."""
    fig, ax1 = plt.subplots()
    t_ax = np.arange(len(speed_pwms))
    ax1.plot(t_ax, speed_pwms, "-", label="Speed")
    ax1.plot(t_ax, turn_pwms, "-", label="Steering")
    ax1.plot(t_ax, error / np.max(error), "--r", label="Error")
    ax1.set_xlabel("Frames")
    ax1.set_ylabel("Speed and Steer Duty Cycle")
    plt.title("Speed and Steer Duty Cycle, and error vs. time")
    fig.legend()
    plt.savefig("voltage_plot.png")
    if show_img:
        plt.show()
    plt.clf()


# =============================================================================
# MAIN LOOP
# =============================================================================


def main():
    last_time = time.perf_counter()
    last_error = 0.0
    second_stop_tick = 0
    final_stop_hits = 0
    smoothed_error = 0.0
    last_turn_amt = SERVO_NEUTRAL_DUTY_CYCLE

    p_vals = []  # Proportional term history
    d_vals = []  # Derivative term history
    err_vals = []  # Error history
    speed_vals = []  # Speed duty cycle history
    steer_vals = []  # Steering duty cycle history

    video = cv2.VideoCapture(CAM_INDEX)
    video.set(cv2.CAP_PROP_FRAME_WIDTH, FRAME_WIDTH)
    video.set(cv2.CAP_PROP_FRAME_HEIGHT, FRAME_HEIGHT)

    # --- Verify camera opened and print actual resolution ---
    if not video.isOpened():
        print("ERROR: Could not open camera")
        return
    actual_w = video.get(cv2.CAP_PROP_FRAME_WIDTH)
    actual_h = video.get(cv2.CAP_PROP_FRAME_HEIGHT)
    print(
        f"Camera opened: requested {FRAME_WIDTH}x{FRAME_HEIGHT}, got {actual_w}x{actual_h}"
    )

    start_car()
    applied_speed = ESC_LOW_DUTY_CYCLE
    counter = 0

    calibrate_esc()
    print("Calibration done, waiting to start")
    wait(STARTUP_WAIT_SEC)
    print("Done waiting")

    passed_first_stop = False
    while counter < MAX_TICKS:
        # pwm_esc.ChangeDutyCycle(ESC_ZERO_DUTY_CYCLE)

        ret, original_frame = video.read()
        if not ret or original_frame is None:
            print("Camera frame read failed, stopping")
            break

        # --- Debug: confirm frame shape each time ---
        do_debug = 0
        if do_debug:
            print(f"\n[tick {counter}] frame shape={original_frame.shape}")

        frame = cv2.resize(original_frame, (FRAME_WIDTH, FRAME_HEIGHT))

        # Vision pipeline — pass debug flag through each stage
        edges = detect_edges(frame, debug=do_debug)
        roi = region_of_interest(edges, debug=do_debug)
        line_segments = detect_line_segments(roi, debug=do_debug)
        lane_lines = average_slope_intercept(frame, line_segments, debug=do_debug)

        if do_debug:
            print(f"  [pipeline result] lane_lines found={len(lane_lines)}")

        lane_lines_image = display_lines(frame, lane_lines)
        steering_angle = get_steering_angle(frame, lane_lines)
        heading_image = display_heading_line(lane_lines_image, steering_angle)
        is_stop, stop_debug_image, stop_pct = isRedFloorVisible(frame)

        if do_debug:
            print(f"  [stop detection] red={stop_pct:.1f}% stop={is_stop}")

        # Debug display
        cv2.imshow("ROI edges", roi)
        # cv2.imshow("Lane lines", lane_lines_image)
        # cv2.imshow("Heading", heading_image)
        # cv2.imshow("Stop detection", stop_debug_image)

        # PD control
        now = time.perf_counter()
        dt = max(now - last_time, MIN_CONTROL_DT_SEC)

        raw_error = steering_angle - STEERING_STRAIGHT_DEG
        error = clamp(raw_error, -STEERING_MAX_ERROR_DEG, STEERING_MAX_ERROR_DEG)
        if abs(error) < STEERING_ERROR_DEADBAND_DEG:
            error = 0

        smoothed_error = (
            STEERING_SMOOTHING_ALPHA * error
            + (1 - STEERING_SMOOTHING_ALPHA) * smoothed_error
        )

        proportional = KP * smoothed_error
        derivative = KD * (smoothed_error - last_error) / dt
        derivative = clamp(derivative, -STEERING_MAX_D_TERM, STEERING_MAX_D_TERM)

        target_turn = clamp(
            SERVO_NEUTRAL_DUTY_CYCLE + proportional + derivative,
            SERVO_MIN_DUTY_CYCLE,
            SERVO_MAX_DUTY_CYCLE,
        )
        turn_amt = rate_limit(target_turn, last_turn_amt, STEERING_MAX_STEP_DUTY)

        pwm_serv.ChangeDutyCycle(turn_amt)

        # Stop sign detection
        skip_drive_this_tick = False
        should_check_stop = counter % STOP_CHECK_INTERVAL == 0
        if should_check_stop:
            if not passed_first_stop:
                if is_stop:
                    pwm_esc.ChangeDutyCycle(ESC_NEUTRAL_DUTY_CYCLE)
                    applied_speed = ESC_NEUTRAL_DUTY_CYCLE
                    wait(FIRST_STOP_WAIT_SEC)
                    passed_first_stop = True
                    second_stop_tick = counter
                    skip_drive_this_tick = True
            elif counter > second_stop_tick + SECOND_STOP_TICK_BUFFER:
                if is_stop:
                    final_stop_hits += 1
                else:
                    final_stop_hits = max(final_stop_hits - 1, 0)

                if final_stop_hits >= FINAL_STOP_CONFIRM_FRAMES:
                    pwm_esc.ChangeDutyCycle(ESC_NEUTRAL_DUTY_CYCLE)
                    applied_speed = ESC_NEUTRAL_DUTY_CYCLE
                    print("Reached final stop")
                    break

        if skip_drive_this_tick:
            applied_speed = ESC_NEUTRAL_DUTY_CYCLE
        else:
            applied_speed = ESC_LOW_DUTY_CYCLE

        p_vals.append(proportional)
        d_vals.append(derivative)
        err_vals.append(smoothed_error)
        speed_vals.append(applied_speed)
        steer_vals.append(turn_amt)

        if not skip_drive_this_tick:
            pwm_esc.ChangeDutyCycle(applied_speed)
            wait(LOOP_DRIVE_WAIT_SEC)

        if cv2.waitKey(1) == 27:  # ESC key to quit
            break

        last_error = smoothed_error
        last_turn_amt = turn_amt
        last_time = now
        counter += 1

    # Shutdown
    reset_car()
    video.release()
    cv2.destroyAllWindows()

    plot_pd(p_vals, d_vals, err_vals)
    plot_pwm(speed_vals, steer_vals, err_vals)


if __name__ == "__main__":
    main()

gpiod_driver_encoder.c

C/C++
#include <linux/module.h>
#include <linux/of_device.h>
#include <linux/kernel.h>
#include <linux/gpio/consumer.h>
#include <linux/platform_device.h>
#include <linux/interrupt.h>
#include <linux/timekeeping.h>
#include <linux/ktime.h>
#include <linux/moduleparam.h>
#include <linux/hrtimer.h>
#include <linux/interrupt.h>
#include <linux/of.h>


// The code is cited from source: https://www.hackster.io/m-e-g-g/the-magnificent-m-e-g-g-car-28ec89#code


// Init variables
unsigned int irq_number;
static struct gpio_desc *my_enc;
static volatile u64 prev_time;
static volatile u64 curr_time;

static int encoder_val;
module_param(encoder_val, int, 0644);

// Interrupt Handler
static irq_handler_t gpio_irq_handler(unsigned int irq, void *dev_id, struct pt_regs *regs) {

	// Ensure time difference between interrupts is greater than 1 ms 
	int enc_val = gpiod_get_value(my_enc); 
	curr_time = ktime_get_ns();
	unsigned long elapsed_time = curr_time - prev_time;
	if (enc_val == 1 && elapsed_time > 1000000) {
		prev_time = curr_time;
		encoder_val = elapsed_time;
		printk(KERN_INFO "irq_handler - timing detected: %lu", elapsed_time);
	}
	
	return (irq_handler_t) IRQ_HANDLED; 
}


// probe function 
static int enc_probe(struct platform_device *pdev)
{
	//struct gpio_desc *my_btn;
	struct device *dev;
	dev = &pdev->dev;
	
	printk("enc_probe - RUNNING v5\n");

	// Get button
	my_enc = gpiod_get(dev, "lights2", GPIOD_IN);
	if(IS_ERR(my_enc)) {
		printk("enc_probe - could not gpiod_get_index 0 for btn\n");
		return -1;
	}
	
	// Setup interrupt & debounce
	irq_number = gpiod_to_irq(my_enc);
	gpiod_set_debounce(my_enc, 1000000);
	
	if(request_irq(irq_number, (irq_handler_t) gpio_irq_handler, IRQF_TRIGGER_RISING, "my_gpio_irq", NULL) != 0){
		printk("Error!\nCan not request interrupt nr.: %d\n", irq_number);
		return -1;	}
	prev_time = ktime_get_ns();
	
	printk("enc_probe - SUCCESS\n");
	return 0;
}

// remove function
static void enc_remove(struct platform_device *pdev)
{
	free_irq(irq_number, NULL);
	printk("enc_remove - Freed interrupt & Removing\n");
	return;
}

static struct of_device_id matchy_match[] = {
    { .compatible = "a_second_name", },
	{},
};

// platform driver object
static struct platform_driver my_driver = {
	.probe	 = enc_probe,
	.remove	 = enc_remove,
	.driver	 = {
	       .name  = "The Rock: this name doesn't even matter",
	       .owner = THIS_MODULE,
	       .of_match_table = matchy_match,
	},
};
module_platform_driver(my_driver);

MODULE_ALIAS("platform:my_driver");
MODULE_DESCRIPTION("424\'s finest");
MODULE_AUTHOR("GregoryC, GiovanniM, MatthewK, EmannuelZ");
MODULE_LICENSE("GPL v2");

gpiod_driver_encoder.mod.c

C/C++
#include <linux/module.h>
#include <linux/export-internal.h>
#include <linux/compiler.h>

MODULE_INFO(name, KBUILD_MODNAME);

__visible struct module __this_module
__section(".gnu.linkonce.this_module") = {
	.name = KBUILD_MODNAME,
	.init = init_module,
#ifdef CONFIG_MODULE_UNLOAD
	.exit = cleanup_module,
#endif
	.arch = MODULE_ARCH_INIT,
};



static const struct modversion_info ____versions[]
__used __section("__versions") = {
	{ 0xf3ebda75, "__platform_driver_register" },
	{ 0xc1514a3b, "free_irq" },
	{ 0x122c3a7e, "_printk" },
	{ 0xa7e8310, "gpiod_get" },
	{ 0xa6b98038, "gpiod_to_irq" },
	{ 0xf547a92e, "gpiod_set_debounce" },
	{ 0x92d5838e, "request_threaded_irq" },
	{ 0xb43f9365, "ktime_get" },
	{ 0x31a2770b, "gpiod_get_value" },
	{ 0xcf825a00, "platform_driver_unregister" },
	{ 0xa8fa7cc2, "param_ops_int" },
	{ 0x47e64c59, "module_layout" },
};

MODULE_INFO(depends, "");


MODULE_INFO(srcversion, "0ABADD2C52CA8791FC32757");

yolo.py

Python
# cited from https://www.hackster.io/t-e-a-m/t-e-a-m-self-driving-car-4caac5
import cv2
import math
import numpy as np
import video_processing as vp
import time
import torch
import matplotlib as plt
from ultralytics import YOLO

frames = []
periods = []

try:
    # Initialize video, process 320 x 320 resoution.
    video = vp.init_video()
    video.set(cv2.CAP_PROP_FRAME_WIDTH, 320)
    video.set(cv2.CAP_PROP_FRAME_HEIGHT, 320)

    # Object recognition model
    model = torch.hub.load('ultralytics/yolov5', 'yolov5s', pretrained=True)
    model.eval()

    t = time.time()

    # Main loop for video processing code.
    while True:
        # Read video frame.
        ret, frame = video.read()
        if not ret:
            break

        img = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        results = model(img)

        # Draw detections
        for x1, y1, x2, y2, conf, cls in results.xyxy[0]:
            x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2)
            label = f"{model.names[int(cls)]} {conf:.2f}"
            cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
            cv2.putText(frame, label, (x1, y1 - 10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)

        # Store frame in memory
        frames.append(frame.copy())

        # Display the frame
        cv2.imshow("YOLOv5", frame)

        # Framerate tracking
        nt = time.time()
        td = nt - t
        t = nt
        periods.append(td)

        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    video.release()
    cv2.destroyAllWindows()

# handle keyboard shutdown gracefully
except KeyboardInterrupt or Exception as e:
    # Clean up video processing resources.
    video.release()
    cv2.destroyAllWindows()

    if len(periods) > 1:
        periods.pop(0)

    # Calculate average framerate.
    ave_period = sum(periods) / len(periods)
    ave_fps = 1 / ave_period
    print("Average framerate: {:.2f}".format(ave_fps))

    # Save video using average framerate
    if frames:
        frame_height, frame_width = frames[0].shape[:2]
        size = (frame_width, frame_height)
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter('output.mp4', fourcc, ave_fps, size)

        # Write out all frames to an mp4 file
        for f in frames:
            out.write(f)

        out.release()
        print("Video saved as output.mp4")

Credits

Daniel Merklen Pimenta De Medeiros
1 project • 0 followers
Hong-Jyun Wang
1 project • 0 followers
Ayush S.
1 project • 0 followers
Leo Marek
1 project • 0 followers
Kaan Yilmaz
1 project • 0 followers

Comments