Chufan QiuGavin JingWanli Lu
Published

ELEC553_Final_Project: Team_ ZZZ

Our autonomous car uses a Raspberry Pi 5 to perform real-time lane keeping, speed control, and stop detection for a self-driving experience.

AdvancedFull instructions provided119
ELEC553_Final_Project: Team_ ZZZ

Things used in this project

Hardware components

Webcam, Logitech® HD Pro
Webcam, Logitech® HD Pro
×1
Raspberry Pi 4 Model B
Raspberry Pi 4 Model B
×1
optical speed encoder
×1
Portable Charger
×1

Software apps and online services

OpenCV
OpenCV

Story

Read more

Code

main.py

Python
# main.py
from time import sleep
from threading import Thread
from motor_pid import * # Import classes and functions from motor_pid.py
from hardware_pwm import * # Import classes and functions from hardware_pwm.py
from pid_debug import *
from path_detect import *
# from plot import *

PIN_servo = "12"
PIN_motor = "13"
PERIOD = 20000000
DUTY_CYCLE_21 = int(PERIOD * 0.2142)

LOG_FILE = "/home/BURIBURI/Desktop/duty.txt"

if __name__ == "__main__":
# Initialize the camera and the servo
    servo_init()
    disp_thread = DisplayThread()
    disp_thread.start()
    while not disp_thread.windows_created:
        time.sleep(0.1)
    camera = CameraAnalysis()
    Thread(target=run_camera_scheduler, args=(camera,)).start()
    
# Initialize the motor and PID
    motor_init()
    motor_pid = EncoderSpeedMonitor()  # Create an instance of the speed monitor
    motor_pid.start()  # Start the monitoring thread
#     debugger = PIDDebug(motor_pid) # Create a debugging platform
    
    # Set target speed (1.0 m/s exampleqw)
    motor_pid.target_speed = 0.25
    motor_pid.Kp = 1.00
    motor_pid.Ki = 0.032
    motor_pid.Kd = 0.002
    
    # Start the debugging window
#     debugger.start()
    # Start the scheduler thread
    Thread(target=run_pid_scheduler, args=(pid_scheduler, 0, motor_pid)).start() #Start the PID running thread
    # Create drawing instances
#     plotter = DutyCyclePlotter(
#         motor_monitor=motor_pid,
#         camera=camera,
#         mode='motor',
#         y_range=(20, 22),
#         y_tick_interval=0.5)
#     Thread(target=run_plot_scheduler, args=(plot_scheduler, 0, plotter)).start()
    # Keep the main thread running
    try:
#         pwmset(1, "duty_cycle", DUTY_CYCLE_21)
        with open(LOG_FILE, "a") as f:
            if f.tell() == 0:
                f.write("timestamp,motor_duty,servo_duty\n")
                
            while True:
#             plotter.update()
#             time.sleep(0.01)
                motor_duty = motor_pid.duty_cycle
                # servo_duty = camera.duty_cycle * 100
                timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
                # log_line = f"{timestamp},{motor_duty:.2f},{servo_duty:.2f}\n"
                # f.write(log_line)
                # f.flush()
                # print(f"\r motor_duty: {motor_duty:.2f} servo_duty: {servo_duty:.2f}",end="")
#             debugger.update()
#             print(f"\r current_speed: {monitor.current_speed:.2f}",end="")
#             print(f"\r current_speed: {motor_pid.current_speed:.2f} duty_cycle: {motor_pid.duty_cycle:.2f}", end="")
    
    except KeyboardInterrupt:
        print("The program has exited safely")

hardware_pwm.py

Python
import os
from time import sleep
import numpy as np

moving_flag = "none"

PIN_servo = "12"
PIN_motor = "13"
PERIOD = 20000000

DUTY_CYCLE_7_5 = int(PERIOD * 0.075)#servo/motor_middle_value Duty=7.5
DUTY_CYCLE_7_7 = int(PERIOD * 0.077)
DUTY_CYCLE_5 = int(PERIOD * 0.05)#servo_parameter_left_side/#motor_parameter_low_speed
DUTY_CYCLE_0_1 = int(PERIOD * 0.000001)

DUTY_CYCLE_9_7 = int(PERIOD * 0.097)#servo_parameter_right_side

DUTY_CYCLE_10 = int(PERIOD * 0.1)#motor_parameter_high_speed
DUTY_CYCLE_8 = int(PERIOD * 0.08)
DUTY_CYCLE_6_5 = int(PERIOD * 0.065)
#Test data
DUTY_CYCLE_20 = int(PERIOD * 0.20)
DUTY_CYCLE_21 = int(PERIOD * 0.21)
DUTY_CYCLE_21_2 = int(PERIOD * 0.215)
DUTY_CYCLE_22 = int(PERIOD * 0.22)
DUTY_CYCLE_40 = int(PERIOD * 0.40)
DUTY_CYCLE_50 = int(PERIOD * 0.50)
DUTY_CYCLE_60 = int(PERIOD * 0.60)

NODE = "/sys/class/pwm/pwmchip0"

def get_channel(pin):
    channel_map = {
        "12": 0,
        "13": 1,
#         "18": 2,
#         "19": 3
    }
    return channel_map.get(pin)

def pwmset(channel, file, value):
    with open(f"{NODE}/pwm{channel}/{file}", "w") as f:
        f.write(str(value))

def configure_pwm(pin, period, duty_cycle):
    channel = get_channel(pin)
    if channel is None:
        print(f"Unknown pin {pin}.")
        return
    
    if not os.path.isdir(f"{NODE}/pwm{channel}"):
        with open(f"{NODE}/export", "w") as f:
            f.write(str(channel))
    
    pwmset(channel, "period", period)
    pwmset(channel, "duty_cycle", duty_cycle)
    pwmset(channel, "enable", 1)

def servo_init():
    pwmset(0, "duty_cycle", DUTY_CYCLE_7_5)
    sleep(1)

def motor_init():
#     pwmset(1, "duty_cycle", 0)
#     sleep(2)
    pwmset(1, "duty_cycle", DUTY_CYCLE_21)
    sleep(2)
#     pwmset(3, "duty_cycle", DUTY_CYCLE_5)
#     sleep(2)
#     pwmset(3, "duty_cycle", DUTY_CYCLE_7_5)

def motor_forward():
    global moving_flag
    moving_flag = "forward"
    pwmset(1, "duty_cycle", DUTY_CYCLE_21)
    sleep(0.5)
    pwmset(1, "duty_cycle", DUTY_CYCLE_21_2)
  
def motor_reverse():
    global moving_flag
    moving_flag = "reverse"
    pwmset(1, "duty_cycle", DUTY_CYCLE_21)
    sleep(0.5)
    pwmset(1, "duty_cycle", DUTY_CYCLE_20)
    
def motor_stop(flag):
#     if flag == 1: # move forward
#         pwmset(1, "duty_cycle", 0)
#         sleep(2)
        
#     if flag == 1: # stop
#         pwmset(1, "duty_cycle", DUTY_CYCLE_22)
#         sleep(0.25)
# #         pwmset(3, "duty_cycle", DUTY_CYCLE_20)
# #         sleep(1)
#         pwmset(1, "duty_cycle", DUTY_CYCLE_21)
    print("stop")

def stop_pwm(pin):
    channel = get_channel(pin)
    if channel is None:
        print(f"Unknown pin {pin}.")
        return
    
    pwmset(channel, "enable", 0)
    with open(f"{NODE}/unexport", "w") as f:
        f.write(str(channel))

if __name__ == "__main__":
 
    configure_pwm(PIN_servo, PERIOD, DUTY_CYCLE_7_5)
    configure_pwm(PIN_motor, PERIOD, 0)
#     sleep(2)
#     motor_init()
#     sleep(3)
    try:
        print("Start")
        while True:
#             pwmset(1, "duty_cycle", DUTY_CYCLE_21_2)
        
            pwmset(0, "duty_cycle", int(PERIOD * 0.085))
#         sleep(3)
#         motor_stop(moving_flag)
#         sleep(2)
#         motor_reverse()
#         sleep(3)
#         motor_stop(moving_flag)
#         sleep(5)
        
#         for duty_cycle in np.arange(DUTY_CYCLE_5,DUTY_CYCLE_9_7,DUTY_CYCLE_0_1):
#             pwmset(0, "duty_cycle", duty_cycle)
#             sleep(0.00001)
#         for duty_cycle in np.arange(DUTY_CYCLE_9_7,DUTY_CYCLE_5,-DUTY_CYCLE_0_1):
#             pwmset(0, "duty_cycle", duty_cycle)
#             sleep(0.00001)
#         for duty_cycle in np.arange(DUTY_CYCLE_5,DUTY_CYCLE_7_5,DUTY_CYCLE_0_1):
#             pwmset(0, "duty_cycle", duty_cycle)
#             sleep(0.00001)
        
    finally:
        print("end")
#         stop_pwm(PIN_servo)
#         stop_pwm(PIN_motor)
        

motor_pid.py

Python
# This functionality includes reading the encoder pulse count
# to measure speed and using PID to adjust the motor speed.
# motor_pid.py

import sched
import gpiod
import time
import math
from threading import Thread, Lock
from hardware_pwm import *
import path_detect
import plot_waveform

USE_ENCODER_DRIVER=False

DUTY_CYCLE_21 = int(PERIOD * 0.21)

# Create a single scheduler instance
pid_scheduler = sched.scheduler(time.time, time.sleep)

def pid_task(sc, monitor):
    """PID control task"""
    # Get the current speed from the monitor
    current_speed = monitor.get_speed()
#     print(f"\rCurrent speed: {current_speed:.2f} m/s | Total pulses: {monitor.pulse_count}", end="")
    # Calculate PID output
    duty = monitor.compute_pid(current_speed)
    # Apply new duty cycle to motor
    monitor.set_motor_duty(duty)
    # Re-add to the scheduling queue (0.5-second interval)
    sc.enter(0.001, 1, pid_task, (sc, monitor))

def run_pid_scheduler(sc, first_delay, monitor):
    """Scheduler running function"""
    time.sleep(first_delay)  # Initial delay
    sc.enter(0, 1, pid_task, (sc, monitor))  # Start the first task immediately
    sc.run()

class EncoderSpeedMonitor:
    def __init__(self, gpio_chip="/dev/gpiochip0", gpio_line_offset=17, 
                 wheel_diameter_m=0.025, encoder_edges_per_rev=40):
        # Hardware parameters
        self.WHEEL_CIRCUMFERENCE = math.pi * wheel_diameter_m
        self.DISTANCE_PER_PULSE = self.WHEEL_CIRCUMFERENCE / encoder_edges_per_rev

        if USE_ENCODER_DRIVER==False:
            # GPIO configuration
            self.chip = gpiod.Chip(gpio_chip)
            self.line = self.chip.get_line(gpio_line_offset)
            self.line.request(consumer="speed_monitor", 
                            type=gpiod.LINE_REQ_EV_RISING_EDGE)
        else:
            self.curr_encoder_val=0
            self.pre_encoder_val=0
      
        # State variables
        self.pulse_count = 0
        self.lock = Lock()
        self.running = False
        self.thread = None
        self.last_pulse_time = 0
        self.DEBOUNCE_TIME = 0.0001
        
        # PID control parameters
        self.target_speed = 0.00        # Target speed in m/s
        self.current_speed = 0.00		# Current speed in m/s
        self.Kp = 0.00                 # Proportional gain
        self.Ki = 0.00                # Integral gain
        self.Kd = 0.00                 # Derivative gain
        self.prev_error = 0.00         # Previous error for differential term
        self.prev_prev_error = 0.00
        self.integral = 0.00           # Integral accumulator
        self.last_duty = 21.00         # Last duty cycle value (21% when stopped)
        self.duty_cycle = 0.00
        
        # PWM constraints
        self.MIN_DUTY = 21.00         # Minimum duty cycle to maintain rotation
        self.MAX_DUTY = 21.60         # Maximum allowed duty cycle

    def _pulse_monitor(self):
        """Pulse monitoring thread core"""
        while self.running:
            if USE_ENCODER_DRIVER==False:
                # if self.line.event_wait(sec=0.1):  # Check every 100ms
                event = self.line.event_read()
                current_time = time.time()
                if current_time - self.last_pulse_time > self.DEBOUNCE_TIME:
                    with self.lock:
                        self.pulse_count += 1
                    self.last_pulse_time = current_time
            else:
                with open("/sys/module/encoder_driver/parameters/encoder_val", "r") as f:
                    encoder_val = f.readlines()
                    self.curr_encoder_val=int(encoder_val[0].strip())
                if self.curr_encoder_val!=self.pre_encoder_val:
                    self.pre_encoder_val=self.curr_encoder_val
                    self.pulse_count += 1
                

    def start(self):
        """Start monitoring"""
        self.running = True
        self.thread = Thread(target=self._pulse_monitor)
        self.thread.start()
        print("Pulse monitoring started")

    def stop(self):
        """Stop monitoring"""
        self.running = False
        self.thread.join()
        if USE_ENCODER_DRIVER==False:
            self.line.release()
            self.chip.close()
        print("Resources released")

    def get_speed(self, interval=0.02):
        """Get current speed"""
        with self.lock:
            start_count = self.pulse_count
      
        time.sleep(interval)
      
        with self.lock:
            delta = self.pulse_count - start_count
            return (delta * self.DISTANCE_PER_PULSE) / interval  # m/s

    def compute_pid(self, current_speed):
        """Implement incremental PID control algorithm"""
        # Calculate current error
        self.current_speed = current_speed
        error = self.target_speed - current_speed
      
        # Calculate PID terms
        motor_P = self.Kp * (error - self.prev_error)
        motor_I = self.Ki * error  # Accumulate integral term
        motor_D = self.Kd * (error - 2*self.prev_error + self.prev_prev_error)
      
        # Calculate total output
        self.duty_cycle = motor_P + motor_I + motor_D + self.last_duty
      
        # Apply output constraints
        self.duty_cycle = max(self.MIN_DUTY, min(self.duty_cycle, self.MAX_DUTY))
      
        # Update state variables
        self.prev_prev_error = self.prev_error
        self.prev_error = error
        self.last_duty = self.duty_cycle

        plot_waveform.plot_p_vals.append(motor_P)
        plot_waveform.plot_d_vals.append(motor_D)
        plot_waveform.plot_speed_pwm.append(self.duty_cycle)
      
        return self.duty_cycle
    def set_motor_duty(self, duty):
        """Set motor PWM duty cycle (placeholder - implement hardware interface)"""
        # Replace with actual PWM control code
        self.duty_cycle = duty
        if path_detect.STOP_FLAG==1:
            print("stop sign pid 1")
            pwmset(1, "duty_cycle", int(0))
            sleep(2)
        elif path_detect.STOP_FLAG==2:
            print("stop sign pid 2")
            sleep(4)
            pwmset(1, "duty_cycle", int(0))
            print(plot_waveform.plot_p_vals,'\n',plot_waveform.plot_d_vals,'\n',plot_waveform.plot_err_vals,'\n',plot_waveform.plot_speed_pwm,'\n',plot_waveform.plot_steer_pwm,'\n')
            plot_waveform.plot_pwm(plot_waveform.plot_speed_pwm, plot_waveform.plot_steer_pwm, plot_waveform.plot_err_vals, show_img=False)
            plot_waveform.plot_pd(plot_waveform.plot_p_vals, plot_waveform.plot_d_vals, plot_waveform.plot_err_vals, show_img=False)
            while(1):
                1
        else:    
            pwmset(1, "duty_cycle", int(duty * 0.01 * PERIOD))
        # Example using RPi.GPIO:
        # self.pwm.ChangeDutyCycle(duty)
#         print(f"Applying duty: {duty:.1f}%")  # Debug output



# if __name__ == "__main__":
#     monitor = EncoderSpeedMonitor()
#     monitor.start()
#     # Start the single timer (immediate start, 2-second interval)
#     Thread(target=run_scheduler, args=(scheduler, 0, monitor)).start()
# 
#     # Keep the main thread running
#     try:
#         while True:
#             time.sleep(1)
#     except KeyboardInterrupt:
#         print("The program has exited safely")

path_detect.py

Python
import cv2
import numpy as np
import matplotlib.pyplot as plt
import platform
import time
import threading
from threading import Thread
from hardware_pwm import *
import plot_waveform

# https://www.hackster.io/really-bad-idea/autonomous-path-following-car-6c4992

# this flag is used to control whether stop is required
STOP_FLAG=0
# Global synchronization primitives
exit_event = threading.Event()
display_queue = []
display_lock = threading.Lock()

class DisplayThread(threading.Thread):
    """Handles all GUI operations and visualization"""
    def __init__(self):
        super().__init__()
        self.windows_created = False

    def run(self):
        """Main display loop with all visualization windows"""
        cv2.namedWindow("Trackbars", cv2.WINDOW_NORMAL)
        cv2.resizeWindow("Trackbars", 500, 600)
        self.create_trackbars()
      
        # Create all visualization windows
        # cv2.namedWindow("blue_mask", cv2.WINDOW_NORMAL)
        self.windows_created = True

        while not exit_event.is_set():
            with display_lock:
                if display_queue:
                    for name, img in display_queue:
                        cv2.imshow(name, img)
                    display_queue.clear()
          
            if cv2.waitKey(25) & 0xFF == ord('q'):
                exit_event.set()
            time.sleep(0.02)

        cv2.destroyAllWindows()

    def create_trackbars(self):
        """Initialize parameter adjustment interface"""
        cv2.createTrackbar("H Low", "Trackbars", 0, 179, lambda x: None)
        cv2.createTrackbar("S Low", "Trackbars", 70, 255, lambda x: None)
        cv2.createTrackbar("V Low", "Trackbars", 120, 255, lambda x: None)
        cv2.createTrackbar("H High", "Trackbars", 179, 179, lambda x: None)
        cv2.createTrackbar("S High", "Trackbars", 255, 255, lambda x: None)
        cv2.createTrackbar("V High", "Trackbars", 255, 255, lambda x: None)

class CameraAnalysis:
    """Main processing class for lane detection and analysis"""
    def __init__(self):
        # setup frame params
        resize= 0.2
        self.width,self.height = [1280, 720]
        self.resized_width, self.resized_height=[int(self.width*resize),int(self.height*resize)]
        self.roi_y1,self.roi_y2,self.roi_x1,self.roi_x2=[self.resized_height //3,self.resized_height *2//3, 0,self.resized_width-10]
        self.roi_width,self.roi_height=(self.roi_x2-self.roi_x1), (self.roi_y2-self.roi_y1)
        print(self.roi_x2,self.roi_x1,self.roi_y2,self.roi_y1)
        # setup hsv threshold
        self.lower_blue = np.array([60, 40, 100])
        self.upper_blue = np.array([120, 255, 255])
        self.lower_red = np.array([0, 70, 120])
        self.upper_red = np.array([179, 255, 255])
        # setup variable to determine how many times stoped 
        self.detected_cnt=0
        self.previous_detected_time=0
        # initialize camera
        self.initialize_camera()
        self.frame_count=0

    def initialize_camera(self):
        """Configure camera source based on platform"""
        if platform.system() == 'Windows':
            self.capture = cv2.VideoCapture(0, cv2.CAP_DSHOW)
        else:
            self.capture = cv2.VideoCapture(0)
            self.capture.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'MJPG'))
          
        if not self.capture.isOpened():
            raise RuntimeError("Failed to initialize video source")
        # setup capture size
        self.capture.set(cv2.CAP_PROP_FRAME_WIDTH, self.width)
        self.capture.set(cv2.CAP_PROP_FRAME_HEIGHT, self.height)

    def __del__(self):
        """Release camera resources"""
        self.capture.release()

    def process_frame(self):
        """Main image processing pipeline"""
        ret, frame = self.capture.read()
        self.frame_count+=1
        # print("frame_count=",self.frame_count)
        if not ret or self.frame_count==400:
            print("Frame is lost or Frame count limit is met\n")
            exit_event.set()
            return None

        """Image preprocessing and ROI extraction"""
        self.resized_img = cv2.resize(frame, (self.resized_width, self.resized_height)) # resize the img
        self.roi = self.resized_img[self.roi_y1:self.roi_y2,self.roi_x1:self.roi_x2]    # get roi
        self.hsv_img = cv2.cvtColor(self.roi, cv2.COLOR_BGR2HSV)                        # convert to HSV
        self.queue_display("resized_img", self.resized_img)
        # Line detection
        deviation=self.detect_lane_lines()
        # Stop sign detection
        self.stop_sign_detection()
        return deviation
    
    def detect_lane_lines(self):
        """Lane line detection"""      
        # get blue region
        self.blue_mask = cv2.inRange(self.hsv_img, self.lower_blue,self.upper_blue)
        self.blue_mask = cv2.morphologyEx(self.blue_mask, cv2.MORPH_OPEN, np.ones((5,5), np.uint8))

        # Iterate over each pixel
        vote1_x_left=0
        vote1_i_left=0
        vote1_x_right=0
        vote1_i_right=0
        for y in range(self.roi_height-3,self.roi_height):
            for x in range(self.roi_width):
                pixel_value  = self.blue_mask[y, x]
                if pixel_value == 255: # white
                    if x<self.roi_width//2: # left line
                        vote1_x_left+=x
                        vote1_i_left+=1
                    else: # right line
                        vote1_x_right+=x
                        vote1_i_right+=1
        # print(vote1_i_left,vote1_i_right)
        if vote1_i_left>30 and vote1_i_right >30: # see both lines
            target_x=(vote1_x_left//vote1_i_left+vote1_x_right//vote1_i_right)//2
#             print("[both]target_x",target_x)
        elif vote1_i_left<=30 and vote1_i_right>0: # only see the right line
            target_x=vote1_x_right//vote1_i_right-self.roi_width//4
#             print("[right]target_x",target_x)
            if target_x < 0:
                target_x=0
        elif vote1_i_right<=30 and vote1_i_left>0: # only see the left line
            target_x=vote1_x_left//vote1_i_left+self.roi_width//4
#             print("[left]target_x",target_x)
            if target_x > self.roi_width:
                target_x=self.roi_width-1
        else: # see neither line
            target_x=self.roi_width//2
        # draw the target
        cv2.line(self.roi, (target_x,self.roi_height), (target_x,self.roi_height), (0,0,255),8)
        self.queue_display("blue_mask", self.blue_mask)
        self.queue_display("roi_target", self.roi)
        return target_x-self.roi_width//2
    
    def stop_sign_detection(self):
        # lower, upper = self.read_parameters()
        # further resize the frame to accelerate speed 
        red_hsv_img=cv2.resize(self.hsv_img, (self.roi_width//5,self.roi_height//5))
        resize_blue_mask=cv2.resize(self.blue_mask, (self.roi_width//5,self.roi_height//5))
        # get red region
        red_mask = cv2.inRange(red_hsv_img, self.lower_red,self.upper_red)
        red_mask = cv2.morphologyEx(red_mask, cv2.MORPH_OPEN, np.ones((3,3), np.uint8))
        diffImg = cv2.subtract(red_mask,resize_blue_mask)
        # self.queue_display("red_mask", red_mask)
        self.queue_display("diffImg", diffImg)
        
        vote1_i_red1=0
        vote1_i_red2=0
        for x in range(self.roi_width//5):
            pixel_value1  = diffImg[self.roi_height//5-1, x]
            pixel_value2  = diffImg[0, x]
            if pixel_value1 == 255: # white
                vote1_i_red1+=1
            if pixel_value2 == 255: # white
                vote1_i_red2+=1
        global STOP_FLAG
        STOP_FLAG=0         
        if vote1_i_red1>6 and vote1_i_red2<6: # stop sign detected
            current_time = int(time.time())
            # print(f"current_time {current_time}")
            if current_time-self.previous_detected_time>7: # make sure not to detect the same stop sign
                print("stop sign path")
                self.detected_cnt+=1
                self.previous_detected_time=current_time
                if self.detected_cnt==2: # permanent stop
                    STOP_FLAG=2
                else:
                    STOP_FLAG=1

    def read_parameters(self):
        """Thread-safe parameter reading from trackbars"""
        h_low = cv2.getTrackbarPos("H Low", "Trackbars")
        s_low = cv2.getTrackbarPos("S Low", "Trackbars")
        v_low = cv2.getTrackbarPos("V Low", "Trackbars")
        h_high = cv2.getTrackbarPos("H High", "Trackbars")
        s_high = cv2.getTrackbarPos("S High", "Trackbars")
        v_high = cv2.getTrackbarPos("V High", "Trackbars")
        lower  = np.array([h_low, s_low, v_low])
        upper = np.array([h_high, s_high, v_high])
        return lower , upper

    def queue_display(self, name, frame):
        """Thread-safe display queue update"""
        with display_lock:
            display_queue.append((name, frame))
        
def run_camera_scheduler(camera):
    """Main processing thread workflow"""
    while not exit_event.is_set():
        deviation=camera.process_frame()
        if deviation is None:
            break
        servo_duty = int((0.075 + deviation*0.0005) * PERIOD)
        pwmset(0, "duty_cycle", servo_duty)
        plot_waveform.plot_err_vals.append(deviation/100)
        plot_waveform.plot_steer_pwm.append(servo_duty/PERIOD*100)
        # print(f"Steering deviation: {deviation} pixels,{servo_duty/PERIOD} %duty")
        time.sleep(0.1)

if __name__ == "__main__":
    disp_thread = DisplayThread()
    disp_thread.start()
    
    while not disp_thread.windows_created:
        time.sleep(0.1)

    try:
        camera = CameraAnalysis()
        proc_thread = Thread(target=run_camera_scheduler, args=(camera,))
        proc_thread.start()

        while not exit_event.is_set():
            time.sleep(1)
          
    except KeyboardInterrupt:
        exit_event.set()
    finally:
        exit_event.set()
#         proc_thread.join()
#         disp_thread.join()
#         del camer

encoder_driver.c

C/C++
// This file has code taken from and influenced by the following sources:
// User raja_961, Autonomous Lane-Keeping Car Using Raspberry
// Pi and OpenCV. Instructables. URL:
// https://www.instructables.com/Autonomous-Lane-Keeping-Car-U
// sing-Raspberry-Pi-and
// other reference
// https://www.hackster.io/big-brains/big-brains-elec-424-final-project-team-11-53c862
// https://www.hackster.io/m-e-g-g/the-magnificent-m-e-g-g-car-28ec89

#include <linux/module.h>
#include <linux/of_device.h>
#include <linux/kernel.h>
#include <linux/gpio/consumer.h>
#include <linux/platform_device.h>
// timer and interrupt header files 
#include <linux/ktime.h>
#include <linux/hrtimer.h>
#include <linux/interrupt.h>

// Timer variables
ktime_t prev_time; // the time of first detection
ktime_t curr_time; // the time of second detection
s64 delta_time; // the duration between two detections

// the value to be read from main
static int encoder_val;
module_param(encoder_val, int, 0644);

/** variable contains pin number o interrupt controller to which GPIO 17 is mapped to */
unsigned int irq_number;
// variables related to encoder, which will be used in the gpiod function
struct gpio_desc *encoder;

// Interrupt service routine is called, when interrupt is triggered
static irqreturn_t gpio_irq_handler(int irq, void *dev_id)
{
	// debounce or first detection
    curr_time=ktime_get_ns();
    delta_time=curr_time-prev_time;
    if (delta_time<300000){ // less than 1ms -> debounce
        // printk("Debounce: delta_time: %lld ns\n",delta_time);
        ;
    }else{ // more than 1ms -> first detection
        prev_time=curr_time; // record the time of first detection
        encoder_val=delta_time;
        printk("delta_time: %lld ns\n",delta_time);
    }
	return IRQ_HANDLED;
}

// probe function
static int encoder_probe(struct platform_device *pdev)
{
	printk("Enter encoder probe\n");
	// init encoder GPIO
	encoder=devm_gpiod_get(&pdev->dev, "encoder", GPIOD_IN);
    if(IS_ERR(encoder)) {
		printk("devm_gpiod_get failed\n");
		return -1;
	}
	// Setup the interrupt
	irq_number = gpiod_to_irq(encoder);
	if(request_irq(irq_number, gpio_irq_handler, IRQF_TRIGGER_FALLING, "my_gpio_irq", NULL) != 0){
		printk("Error!\nCan not request interrupt nr.: %d\n", irq_number);
		return -1;
	}
	printk("GPIO 17 is mapped to IRQ Nr.: %d\n", irq_number);
	prev_time = ktime_get_ns();
	return 0;
}

// remove function
static int encoder_remove(struct platform_device *pdev)
{
	// Free the irq and print a message
	free_irq(irq_number, NULL);
    printk("encoder_remove\n");
	return 0;
}

static struct of_device_id matchy_match[] = {
    {.compatible="mycompany,myencoder",},
    {/* leave alone - keep this here (end node) */},
};

// platform driver object
static struct platform_driver adam_driver = {
	.probe	 = encoder_probe,
	.remove	 = encoder_remove,
	.driver	 = {
	       .name  = "The Rock: this name doesn't even matter",
	       .owner = THIS_MODULE,
	       .of_match_table = matchy_match,
	},
};

module_platform_driver(adam_driver);

MODULE_DESCRIPTION("Final Project");
MODULE_AUTHOR("gj20");
MODULE_LICENSE("GPL v2");
MODULE_ALIAS("platform:adam_driver");

encoder_gpio17.dts

C/C++
/dts-v1/;
/plugin/;

/ {
    fragment@0 {
        target-path = "/";
        __overlay__ {
            my_encoder_device {
                compatible = "mycompany,myencoder";
                encoder-gpios = <&gpio 17 0>;  
            };
        };
    };
};

pi_opencv.py

Python
import cv2
import numpy as np
import platform
import time
import threading
from threading import Thread
# from hardware_pwm import *

# Global synchronization primitives
exit_event = threading.Event()
display_queue = []
display_lock = threading.Lock()

class DisplayThread(threading.Thread):
    """Handles all GUI operations and visualization"""
    def __init__(self):
        super().__init__()
        self.windows_created = False


    def run(self):
        """Main display loop with all visualization windows"""
        cv2.namedWindow("Trackbars", cv2.WINDOW_NORMAL)
        cv2.resizeWindow("Trackbars", 500, 600)
        self.create_trackbars()
      
        # Create all visualization windows
        cv2.namedWindow("ROI", cv2.WINDOW_NORMAL)
        cv2.namedWindow("blue_mask", cv2.WINDOW_NORMAL)
        # cv2.namedWindow("canny", cv2.WINDOW_NORMAL)
        # cv2.namedWindow("fitline", cv2.WINDOW_NORMAL)
        cv2.namedWindow("target", cv2.WINDOW_NORMAL)
        self.windows_created = True

        while not exit_event.is_set():
            with display_lock:
                if display_queue:
                    for name, img in display_queue:
                        cv2.imshow(name, img)
                    display_queue.clear()
          
            if cv2.waitKey(25) & 0xFF == ord('q'):
                exit_event.set()
            time.sleep(0.02)

        cv2.destroyAllWindows()

    def create_trackbars(self):
        """Initialize parameter adjustment interface"""
        cv2.createTrackbar("H Low", "Trackbars", 60, 179, lambda x: None)
        cv2.createTrackbar("S Low", "Trackbars", 40, 255, lambda x: None)
        cv2.createTrackbar("V Low", "Trackbars", 100, 255, lambda x: None)
        cv2.createTrackbar("H High", "Trackbars", 120, 179, lambda x: None)
        cv2.createTrackbar("S High", "Trackbars", 255, 255, lambda x: None)
        cv2.createTrackbar("V High", "Trackbars", 255, 255, lambda x: None)
        cv2.createTrackbar("threshold", "Trackbars", 60, 200, lambda x: None)
        cv2.createTrackbar("minLineLength", "Trackbars", 10, 100, lambda x: None)
        cv2.createTrackbar("maxLineGap", "Trackbars", 100, 100, lambda x: None)

class CameraAnalysis:
    """Main processing class for lane detection and analysis"""
    def __init__(self):
        # setup frame params
        resize= 0.2
        self.width,self.height = [1280, 720]
        self.resized_width, self.resized_height=[int(self.width*resize),int(self.height*resize)]
        self.roi_y1,self.roi_y2,self.roi_x1,self.roi_x2=[self.height//3,self.height*2//3, 0,self.resized_width]
        self.roi_width,self.roi_height=(self.roi_x2-self.roi_x1), (self.roi_y2-self.roi_y1)
        # warpPerspective Transformation
        # pts1 = np.float32([[35, 480], [240, 0], [580, 480], [370, 0]]) # original frame
        # pts2 = np.float32([[35, 480], [35 , 0], [580, 480], [580, 0]]) # transformed frame
        # self.PTM = cv2.getPerspectiveTransform(pts1, pts2) # calculate the warpPerspective Transformation matrix
        # initialize camera
        self.initialize_camera()

    def initialize_camera(self):
        """Configure camera source based on platform"""
        if platform.system() == 'Windows':
            self.capture = cv2.VideoCapture(0, cv2.CAP_DSHOW)
        else:
            self.capture = cv2.VideoCapture(0)
            self.capture.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'MJPG'))
          
        if not self.capture.isOpened():
            raise RuntimeError("Failed to initialize video source")
      
        self.capture.set(cv2.CAP_PROP_FRAME_WIDTH, self.width)
        self.capture.set(cv2.CAP_PROP_FRAME_HEIGHT, self.height)

    def __del__(self):
        """Release camera resources"""
        self.capture.release()

    def process_frame(self):
        """Main image processing pipeline"""
        ret, frame = self.capture.read()
        if not ret:
            exit_event.set()
            return None

        """Image preprocessing and ROI extraction"""
        self.resized_img = cv2.resize(frame, (self.resized_width, self.resized_height))         # resize the img
        self.roi = self.resized_img[self.roi_y1:self.roi_y2,self.roi_x1:self.roi_x2]            # get roi
        # PTM_img = cv2.warpPerspective(self.roi, self.PTM, (self.roi_width, self.roi_height))    # warpPerspective Transformation
        self.hsv_img = cv2.cvtColor(self.roi, cv2.COLOR_BGR2HSV)                                # convert to HSV
        
        self.queue_display("resized_img", self.resized_img)
        # self.queue_display("PTM_frame", PTM_img)
        self.queue_display("ROI", self.roi)

        # Line detection
        deviation=self.analyze_lanes()

        # Stop sign detection

        return deviation

    def analyze_lanes(self):
        """Complete lane analysis workflow"""
        deviation = self.detect_lane_lines()
        # left_center = self.fit_line(left_points)
        # right_center = self.fit_line(right_points)
        # return self.calculate_deviation(left_center, right_center)
        return deviation
    
    def detect_lane_lines(self):
        """Lane line detection using color thresholding and Hough transform"""
        lower, upper, threshold, min_len, max_gap = self.read_parameters()
      
        mask = cv2.inRange(self.hsv_img, lower, upper)
        mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, np.ones((5,5), np.uint8))

        # 
        vote1_x_left=0
        vote1_i_left=0
        vote1_x_right=0
        vote1_i_right=0
        for y in range(self.roi_height-3,self.roi_height):
            for x in range(self.roi_width):
                pixel_value  = mask[y, x]
                if pixel_value == 255: # white
                    if x<self.roi_width//2:
                        vote1_x_left+=x
                        vote1_i_left+=1
                    else:
                        vote1_x_right+=x
                        vote1_i_right+=1
        print(vote1_i_left,vote1_i_right)
        if vote1_i_left>100 and vote1_i_right >100:
            target_x=(vote1_x_left//vote1_i_left+vote1_x_right//vote1_i_right)//2
            print("[both]target_x",target_x)
        elif vote1_i_left<=100 and vote1_i_right >0: # only see the right line
            target_x=vote1_x_right//vote1_i_right-self.roi_width//4
            print("[right]target_x",target_x)
            if target_x < 0:
                target_x=0
        elif vote1_i_right<=100 and vote1_i_left>0: # only see the left line
            target_x=vote1_x_left//vote1_i_left+self.roi_width//4
            print("[left]target_x",target_x)
            if target_x > self.roi_width:
                target_x=self.roi_width-1
        else:
            target_x=self.roi_width//2
        
        cv2.line(self.roi, (target_x,self.roi_height), (target_x,self.roi_height), (0,0,255),10)
        
        # edges = cv2.Canny(mask, 150, 180)
        # lines = cv2.HoughLinesP(edges, 1, np.pi/180, threshold,
        #                        minLineLength=min_len, maxLineGap=max_gap)
        self.queue_display("blue_mask", mask)
        self.queue_display("roi_withline", self.roi)
        # self.queue_display("canny", edges)
        # return self.classify_points(lines)
        return target_x-self.roi_width//2

    def fit_line(self, points):
        """Line fitting"""
        if not points or len(points) < 2:
            return None
          
        points = np.array(points).reshape(-1, 2)
        vx, vy, x0, y0 = cv2.fitLine(points, cv2.DIST_L2, 0, 0.01, 0.01).flatten()
      
        if abs(vx) < 0.001:
            cv2.line(self.roi, (int(x0),0), (int(x0),self.height), (0,0,255),2)
            center = int(x0)
        else:
            y1 = int(y0 - x0 * vy / vx)
            y2 = int(y0 + (self.width - x0) * vy / vx)
            cv2.line(self.roi, (0,y1), (self.width,y2), (0,0,255),2)
            
            if abs(vy) < 0.001:
                return self.width//2
            center = int(x0 - y0 * vx / vy)
      
        # self.queue_display("fitline", self.roi)
        return center

    def calculate_deviation(self, left, right):
        """Calculate steering deviation from lane center"""
        if left is None or right is None:
            return 0
          
        center = (left + right) // 2
        cv2.line(self.roi, (self.width//2, self.height//3+5), 
                (center,0), (255,255,0),2)
        cv2.line(self.roi, (self.width//2, self.height//3+5),
                (self.width//2,0), (255,0,255),2)
        self.queue_display("target", self.roi)
        return center - self.width//2

    def classify_points(self, lines):
        """Classify detected points into left/right lanes"""
        left, right = [], []
        if lines is not None:
            for line in lines:
                x1, y1, x2, y2 = line[0]
                if x1 < self.width//2:
                    left.extend([[x1,y1], [x2,y2]])
                else:
                    right.extend([[x1,y1], [x2,y2]])
                cv2.line(self.roi, (x1,y1), (x2,y2), (0,255,0),2)
        return left, right

    def read_parameters(self):
        """Thread-safe parameter reading from trackbars"""
        h_low = cv2.getTrackbarPos("H Low", "Trackbars")
        s_low = cv2.getTrackbarPos("S Low", "Trackbars")
        v_low = cv2.getTrackbarPos("V Low", "Trackbars")
        h_high = cv2.getTrackbarPos("H High", "Trackbars")
        s_high = cv2.getTrackbarPos("S High", "Trackbars")
        v_high = cv2.getTrackbarPos("V High", "Trackbars")

        threshold = cv2.getTrackbarPos("threshold", "Trackbars")
        minLineLength = cv2.getTrackbarPos("minLineLength", "Trackbars")
        maxLineGap = cv2.getTrackbarPos("maxLineGap", "Trackbars")
    
        lower  = np.array([h_low, s_low, v_low])
        upper = np.array([h_high, s_high, v_high])
        return lower , upper , threshold, minLineLength, maxLineGap

    def queue_display(self, name, frame):
        """Thread-safe display queue update"""
        with display_lock:
            display_queue.append((name, frame))
        
def run_camera_scheduler(camera):
    """Main processing thread workflow"""
    while not exit_event.is_set():
        deviation=camera.process_frame()
        if deviation is None:
            break
        print(f"Steering deviation: {deviation} pixels")
        time.sleep(0.1)

if __name__ == "__main__":
    disp_thread = DisplayThread()
    disp_thread.start()

    while not disp_thread.windows_created:
        time.sleep(0.1)

    try:
        camera = CameraAnalysis()
        proc_thread = Thread(target=run_camera_scheduler, args=(camera,))
        proc_thread.start()

        while not exit_event.is_set():
            time.sleep(1)
          
    except KeyboardInterrupt:
        exit_event.set()
    finally:
        exit_event.set()
#         proc_thread.join()
#         disp_thread.join()
#         del camer

Credits

Chufan Qiu
1 project • 0 followers
Gavin Jing
1 project • 0 followers
Wanli Lu
1 project • 0 followers

Comments