Hardware components | ||||||
![]() |
| × | 1 | |||
![]() |
| × | 1 | |||
| × | 1 | ||||
| × | 1 | ||||
Software apps and online services | ||||||
![]() |
|
This project utilizes a Raspberry Pi for image recognition and motor control, enabling a RC car to autonomously drive along a lane and stop accurately at designated stop positions. The image processing component consists of two main parts: lane keeping and red stop sign detection. The results from these processes are used to guide motor control, which is described in detail below. The motor control system includes both servo and drive motor control. The servo controls the steering to keep the car centered within the lane, while the drive motor controls the vehicle's speed. Additionally, a photoelectric encoder is used to measure the car's speed, enabling PID-based closed-loop speed control. From a system implementation perspective, the project employs the Linux device tree to configure GPIOs. Within the kernel, edge-triggered interrupts are handled using gpiod functions to capture signals from the encoder. Since the encoder data can be noisy and unstable, a first-order Kalman filter is applied to smooth the speed readings, resulting in more stable and reliable velocity feedback. This work builds upon the foundational implementation by user raja_961, “Autonomous Lane-Keeping Car Using Raspberry Pi and OpenCV”
URL: https://www.instructables.com/Autonomous-Lane-Keeping-Car-Using-Raspberry-Pi-and/.
Other reference:
https://www.hackster.io/really-bad-idea/autonomous-path-following-car-6c4992
https://www.hackster.io/big-brains/big-brains-elec-424-final-project-team-11-53c862
Resolution, Proportional Gain, Derivative GainTo determine appropriate values for image resolution and PID controller gains, we adopted an iterative tuning approach informed by real-time visual feedback and system performance. Lower image resolutions (e.g., 256×144) were selected to ensure real-time processing capabilities on the Raspberry Pi 4 without sacrificing lane detection accuracy. The proportional and derivative gains (Kp, Kd) were initially set to low values and incrementally adjusted based on observed steering stability and convergence toward the lane center. To support gain tuning, we developed a custom PIDDebug visualization module using OpenCV, which plots PID responses (P, D) and system errors over time. This allowed us to empirically balance responsiveness and overshoot, ultimately enhancing the system's lane-following performance.
Lane keepingWe begin by capturing images using OpenCV and resizing them to 256x144 to accelerate processing. We extract a Region of Interest (ROI) to eliminate irrelevant parts of the image and convert the result to the HSV color space. Using thresholding and morphological operations, we remove noise and isolate the blue regions in the image, which represent the lane markings. Next, we scan the bottom few rows of the image to locate the two lane lines by identifying and counting blue pixels on both the left and right sides, which is fast as well as stable. We calculate a weighted average of their positions to determine a target point. The deviation between this target point and the center of the ROI represents the car’s lateral offset from the lane center. This deviation is then multiplied by a gain coefficient to adjust the PWM duty cycle for steering control. The lane-keeping function is stable and enables the car to maintain a central position within the lane with minimal side-to-side oscillation. Additionally, we handle edge cases such as when only one lane line is detected—for instance, if only the right lane is visible, the system interprets this as the car drifting far right and commands the servo to steer left at a larger angle to quickly return to the center.
Stop Paper DetectionFor red stop sign detection, we significantly reduced the image size to 50x10 since we are not concerned with the exact coordinates of red pixels, but only with whether a red region appears in the frame. This improves processing speed considerably. We apply color detection combined with morphological operations to isolate the target regions. Because we couldn’t find a reliable threshold to isolate red regions alone, we used a color threshold that captures both blue and red areas. We then subtract the previously extracted blue region from this combined result to obtain the red region mask. We scan the first and last rows of the image and count the red pixels to determine whether the edges of a red region are visible, helping the vehicle identify when it is approaching a stop sign and control its stopping position. Additionally, we track the time at which the red region is detected to confirm whether it is the same stop sign. A counter is also used to distinguish between the first stop and the final stop, allowing the system to handle each case appropriately. At last, we use a stop flag to tell the motor controller to stop.
Objection RecognitionRunning YOLOv5 on a Raspberry Pi involves setting up a lightweight Python environment with PyTorch and installing the necessary dependencies such as OpenCV and NumPy. The YOLOv5 model is typically converted to a smaller version (e.g., YOLOv5s) to fit within the Pi's limited resources. Once the model is loaded, the Pi captures frames from a camera, resizes and preprocesses them, then performs inference to detect objects in real time, albeit at a lower frame rate compared to more powerful hardware.
# main.py
from time import sleep
from threading import Thread
from motor_pid import * # Import classes and functions from motor_pid.py
from hardware_pwm import * # Import classes and functions from hardware_pwm.py
from pid_debug import *
from path_detect import *
# from plot import *
PIN_servo = "12"
PIN_motor = "13"
PERIOD = 20000000
DUTY_CYCLE_21 = int(PERIOD * 0.2142)
LOG_FILE = "/home/BURIBURI/Desktop/duty.txt"
if __name__ == "__main__":
# Initialize the camera and the servo
servo_init()
disp_thread = DisplayThread()
disp_thread.start()
while not disp_thread.windows_created:
time.sleep(0.1)
camera = CameraAnalysis()
Thread(target=run_camera_scheduler, args=(camera,)).start()
# Initialize the motor and PID
motor_init()
motor_pid = EncoderSpeedMonitor() # Create an instance of the speed monitor
motor_pid.start() # Start the monitoring thread
# debugger = PIDDebug(motor_pid) # Create a debugging platform
# Set target speed (1.0 m/s exampleqw)
motor_pid.target_speed = 0.25
motor_pid.Kp = 1.00
motor_pid.Ki = 0.032
motor_pid.Kd = 0.002
# Start the debugging window
# debugger.start()
# Start the scheduler thread
Thread(target=run_pid_scheduler, args=(pid_scheduler, 0, motor_pid)).start() #Start the PID running thread
# Create drawing instances
# plotter = DutyCyclePlotter(
# motor_monitor=motor_pid,
# camera=camera,
# mode='motor',
# y_range=(20, 22),
# y_tick_interval=0.5)
# Thread(target=run_plot_scheduler, args=(plot_scheduler, 0, plotter)).start()
# Keep the main thread running
try:
# pwmset(1, "duty_cycle", DUTY_CYCLE_21)
with open(LOG_FILE, "a") as f:
if f.tell() == 0:
f.write("timestamp,motor_duty,servo_duty\n")
while True:
# plotter.update()
# time.sleep(0.01)
motor_duty = motor_pid.duty_cycle
# servo_duty = camera.duty_cycle * 100
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
# log_line = f"{timestamp},{motor_duty:.2f},{servo_duty:.2f}\n"
# f.write(log_line)
# f.flush()
# print(f"\r motor_duty: {motor_duty:.2f} servo_duty: {servo_duty:.2f}",end="")
# debugger.update()
# print(f"\r current_speed: {monitor.current_speed:.2f}",end="")
# print(f"\r current_speed: {motor_pid.current_speed:.2f} duty_cycle: {motor_pid.duty_cycle:.2f}", end="")
except KeyboardInterrupt:
print("The program has exited safely")
import os
from time import sleep
import numpy as np
moving_flag = "none"
PIN_servo = "12"
PIN_motor = "13"
PERIOD = 20000000
DUTY_CYCLE_7_5 = int(PERIOD * 0.075)#servo/motor_middle_value Duty=7.5
DUTY_CYCLE_7_7 = int(PERIOD * 0.077)
DUTY_CYCLE_5 = int(PERIOD * 0.05)#servo_parameter_left_side/#motor_parameter_low_speed
DUTY_CYCLE_0_1 = int(PERIOD * 0.000001)
DUTY_CYCLE_9_7 = int(PERIOD * 0.097)#servo_parameter_right_side
DUTY_CYCLE_10 = int(PERIOD * 0.1)#motor_parameter_high_speed
DUTY_CYCLE_8 = int(PERIOD * 0.08)
DUTY_CYCLE_6_5 = int(PERIOD * 0.065)
#Test data
DUTY_CYCLE_20 = int(PERIOD * 0.20)
DUTY_CYCLE_21 = int(PERIOD * 0.21)
DUTY_CYCLE_21_2 = int(PERIOD * 0.215)
DUTY_CYCLE_22 = int(PERIOD * 0.22)
DUTY_CYCLE_40 = int(PERIOD * 0.40)
DUTY_CYCLE_50 = int(PERIOD * 0.50)
DUTY_CYCLE_60 = int(PERIOD * 0.60)
NODE = "/sys/class/pwm/pwmchip0"
def get_channel(pin):
channel_map = {
"12": 0,
"13": 1,
# "18": 2,
# "19": 3
}
return channel_map.get(pin)
def pwmset(channel, file, value):
with open(f"{NODE}/pwm{channel}/{file}", "w") as f:
f.write(str(value))
def configure_pwm(pin, period, duty_cycle):
channel = get_channel(pin)
if channel is None:
print(f"Unknown pin {pin}.")
return
if not os.path.isdir(f"{NODE}/pwm{channel}"):
with open(f"{NODE}/export", "w") as f:
f.write(str(channel))
pwmset(channel, "period", period)
pwmset(channel, "duty_cycle", duty_cycle)
pwmset(channel, "enable", 1)
def servo_init():
pwmset(0, "duty_cycle", DUTY_CYCLE_7_5)
sleep(1)
def motor_init():
# pwmset(1, "duty_cycle", 0)
# sleep(2)
pwmset(1, "duty_cycle", DUTY_CYCLE_21)
sleep(2)
# pwmset(3, "duty_cycle", DUTY_CYCLE_5)
# sleep(2)
# pwmset(3, "duty_cycle", DUTY_CYCLE_7_5)
def motor_forward():
global moving_flag
moving_flag = "forward"
pwmset(1, "duty_cycle", DUTY_CYCLE_21)
sleep(0.5)
pwmset(1, "duty_cycle", DUTY_CYCLE_21_2)
def motor_reverse():
global moving_flag
moving_flag = "reverse"
pwmset(1, "duty_cycle", DUTY_CYCLE_21)
sleep(0.5)
pwmset(1, "duty_cycle", DUTY_CYCLE_20)
def motor_stop(flag):
# if flag == 1: # move forward
# pwmset(1, "duty_cycle", 0)
# sleep(2)
# if flag == 1: # stop
# pwmset(1, "duty_cycle", DUTY_CYCLE_22)
# sleep(0.25)
# # pwmset(3, "duty_cycle", DUTY_CYCLE_20)
# # sleep(1)
# pwmset(1, "duty_cycle", DUTY_CYCLE_21)
print("stop")
def stop_pwm(pin):
channel = get_channel(pin)
if channel is None:
print(f"Unknown pin {pin}.")
return
pwmset(channel, "enable", 0)
with open(f"{NODE}/unexport", "w") as f:
f.write(str(channel))
if __name__ == "__main__":
configure_pwm(PIN_servo, PERIOD, DUTY_CYCLE_7_5)
configure_pwm(PIN_motor, PERIOD, 0)
# sleep(2)
# motor_init()
# sleep(3)
try:
print("Start")
while True:
# pwmset(1, "duty_cycle", DUTY_CYCLE_21_2)
pwmset(0, "duty_cycle", int(PERIOD * 0.085))
# sleep(3)
# motor_stop(moving_flag)
# sleep(2)
# motor_reverse()
# sleep(3)
# motor_stop(moving_flag)
# sleep(5)
# for duty_cycle in np.arange(DUTY_CYCLE_5,DUTY_CYCLE_9_7,DUTY_CYCLE_0_1):
# pwmset(0, "duty_cycle", duty_cycle)
# sleep(0.00001)
# for duty_cycle in np.arange(DUTY_CYCLE_9_7,DUTY_CYCLE_5,-DUTY_CYCLE_0_1):
# pwmset(0, "duty_cycle", duty_cycle)
# sleep(0.00001)
# for duty_cycle in np.arange(DUTY_CYCLE_5,DUTY_CYCLE_7_5,DUTY_CYCLE_0_1):
# pwmset(0, "duty_cycle", duty_cycle)
# sleep(0.00001)
finally:
print("end")
# stop_pwm(PIN_servo)
# stop_pwm(PIN_motor)
# This functionality includes reading the encoder pulse count
# to measure speed and using PID to adjust the motor speed.
# motor_pid.py
import sched
import gpiod
import time
import math
from threading import Thread, Lock
from hardware_pwm import *
import path_detect
import plot_waveform
USE_ENCODER_DRIVER=False
DUTY_CYCLE_21 = int(PERIOD * 0.21)
# Create a single scheduler instance
pid_scheduler = sched.scheduler(time.time, time.sleep)
def pid_task(sc, monitor):
"""PID control task"""
# Get the current speed from the monitor
current_speed = monitor.get_speed()
# print(f"\rCurrent speed: {current_speed:.2f} m/s | Total pulses: {monitor.pulse_count}", end="")
# Calculate PID output
duty = monitor.compute_pid(current_speed)
# Apply new duty cycle to motor
monitor.set_motor_duty(duty)
# Re-add to the scheduling queue (0.5-second interval)
sc.enter(0.001, 1, pid_task, (sc, monitor))
def run_pid_scheduler(sc, first_delay, monitor):
"""Scheduler running function"""
time.sleep(first_delay) # Initial delay
sc.enter(0, 1, pid_task, (sc, monitor)) # Start the first task immediately
sc.run()
class EncoderSpeedMonitor:
def __init__(self, gpio_chip="/dev/gpiochip0", gpio_line_offset=17,
wheel_diameter_m=0.025, encoder_edges_per_rev=40):
# Hardware parameters
self.WHEEL_CIRCUMFERENCE = math.pi * wheel_diameter_m
self.DISTANCE_PER_PULSE = self.WHEEL_CIRCUMFERENCE / encoder_edges_per_rev
if USE_ENCODER_DRIVER==False:
# GPIO configuration
self.chip = gpiod.Chip(gpio_chip)
self.line = self.chip.get_line(gpio_line_offset)
self.line.request(consumer="speed_monitor",
type=gpiod.LINE_REQ_EV_RISING_EDGE)
else:
self.curr_encoder_val=0
self.pre_encoder_val=0
# State variables
self.pulse_count = 0
self.lock = Lock()
self.running = False
self.thread = None
self.last_pulse_time = 0
self.DEBOUNCE_TIME = 0.0001
# PID control parameters
self.target_speed = 0.00 # Target speed in m/s
self.current_speed = 0.00 # Current speed in m/s
self.Kp = 0.00 # Proportional gain
self.Ki = 0.00 # Integral gain
self.Kd = 0.00 # Derivative gain
self.prev_error = 0.00 # Previous error for differential term
self.prev_prev_error = 0.00
self.integral = 0.00 # Integral accumulator
self.last_duty = 21.00 # Last duty cycle value (21% when stopped)
self.duty_cycle = 0.00
# PWM constraints
self.MIN_DUTY = 21.00 # Minimum duty cycle to maintain rotation
self.MAX_DUTY = 21.60 # Maximum allowed duty cycle
def _pulse_monitor(self):
"""Pulse monitoring thread core"""
while self.running:
if USE_ENCODER_DRIVER==False:
# if self.line.event_wait(sec=0.1): # Check every 100ms
event = self.line.event_read()
current_time = time.time()
if current_time - self.last_pulse_time > self.DEBOUNCE_TIME:
with self.lock:
self.pulse_count += 1
self.last_pulse_time = current_time
else:
with open("/sys/module/encoder_driver/parameters/encoder_val", "r") as f:
encoder_val = f.readlines()
self.curr_encoder_val=int(encoder_val[0].strip())
if self.curr_encoder_val!=self.pre_encoder_val:
self.pre_encoder_val=self.curr_encoder_val
self.pulse_count += 1
def start(self):
"""Start monitoring"""
self.running = True
self.thread = Thread(target=self._pulse_monitor)
self.thread.start()
print("Pulse monitoring started")
def stop(self):
"""Stop monitoring"""
self.running = False
self.thread.join()
if USE_ENCODER_DRIVER==False:
self.line.release()
self.chip.close()
print("Resources released")
def get_speed(self, interval=0.02):
"""Get current speed"""
with self.lock:
start_count = self.pulse_count
time.sleep(interval)
with self.lock:
delta = self.pulse_count - start_count
return (delta * self.DISTANCE_PER_PULSE) / interval # m/s
def compute_pid(self, current_speed):
"""Implement incremental PID control algorithm"""
# Calculate current error
self.current_speed = current_speed
error = self.target_speed - current_speed
# Calculate PID terms
motor_P = self.Kp * (error - self.prev_error)
motor_I = self.Ki * error # Accumulate integral term
motor_D = self.Kd * (error - 2*self.prev_error + self.prev_prev_error)
# Calculate total output
self.duty_cycle = motor_P + motor_I + motor_D + self.last_duty
# Apply output constraints
self.duty_cycle = max(self.MIN_DUTY, min(self.duty_cycle, self.MAX_DUTY))
# Update state variables
self.prev_prev_error = self.prev_error
self.prev_error = error
self.last_duty = self.duty_cycle
plot_waveform.plot_p_vals.append(motor_P)
plot_waveform.plot_d_vals.append(motor_D)
plot_waveform.plot_speed_pwm.append(self.duty_cycle)
return self.duty_cycle
def set_motor_duty(self, duty):
"""Set motor PWM duty cycle (placeholder - implement hardware interface)"""
# Replace with actual PWM control code
self.duty_cycle = duty
if path_detect.STOP_FLAG==1:
print("stop sign pid 1")
pwmset(1, "duty_cycle", int(0))
sleep(2)
elif path_detect.STOP_FLAG==2:
print("stop sign pid 2")
sleep(4)
pwmset(1, "duty_cycle", int(0))
print(plot_waveform.plot_p_vals,'\n',plot_waveform.plot_d_vals,'\n',plot_waveform.plot_err_vals,'\n',plot_waveform.plot_speed_pwm,'\n',plot_waveform.plot_steer_pwm,'\n')
plot_waveform.plot_pwm(plot_waveform.plot_speed_pwm, plot_waveform.plot_steer_pwm, plot_waveform.plot_err_vals, show_img=False)
plot_waveform.plot_pd(plot_waveform.plot_p_vals, plot_waveform.plot_d_vals, plot_waveform.plot_err_vals, show_img=False)
while(1):
1
else:
pwmset(1, "duty_cycle", int(duty * 0.01 * PERIOD))
# Example using RPi.GPIO:
# self.pwm.ChangeDutyCycle(duty)
# print(f"Applying duty: {duty:.1f}%") # Debug output
# if __name__ == "__main__":
# monitor = EncoderSpeedMonitor()
# monitor.start()
# # Start the single timer (immediate start, 2-second interval)
# Thread(target=run_scheduler, args=(scheduler, 0, monitor)).start()
#
# # Keep the main thread running
# try:
# while True:
# time.sleep(1)
# except KeyboardInterrupt:
# print("The program has exited safely")
import cv2
import numpy as np
import matplotlib.pyplot as plt
import platform
import time
import threading
from threading import Thread
from hardware_pwm import *
import plot_waveform
# https://www.hackster.io/really-bad-idea/autonomous-path-following-car-6c4992
# this flag is used to control whether stop is required
STOP_FLAG=0
# Global synchronization primitives
exit_event = threading.Event()
display_queue = []
display_lock = threading.Lock()
class DisplayThread(threading.Thread):
"""Handles all GUI operations and visualization"""
def __init__(self):
super().__init__()
self.windows_created = False
def run(self):
"""Main display loop with all visualization windows"""
cv2.namedWindow("Trackbars", cv2.WINDOW_NORMAL)
cv2.resizeWindow("Trackbars", 500, 600)
self.create_trackbars()
# Create all visualization windows
# cv2.namedWindow("blue_mask", cv2.WINDOW_NORMAL)
self.windows_created = True
while not exit_event.is_set():
with display_lock:
if display_queue:
for name, img in display_queue:
cv2.imshow(name, img)
display_queue.clear()
if cv2.waitKey(25) & 0xFF == ord('q'):
exit_event.set()
time.sleep(0.02)
cv2.destroyAllWindows()
def create_trackbars(self):
"""Initialize parameter adjustment interface"""
cv2.createTrackbar("H Low", "Trackbars", 0, 179, lambda x: None)
cv2.createTrackbar("S Low", "Trackbars", 70, 255, lambda x: None)
cv2.createTrackbar("V Low", "Trackbars", 120, 255, lambda x: None)
cv2.createTrackbar("H High", "Trackbars", 179, 179, lambda x: None)
cv2.createTrackbar("S High", "Trackbars", 255, 255, lambda x: None)
cv2.createTrackbar("V High", "Trackbars", 255, 255, lambda x: None)
class CameraAnalysis:
"""Main processing class for lane detection and analysis"""
def __init__(self):
# setup frame params
resize= 0.2
self.width,self.height = [1280, 720]
self.resized_width, self.resized_height=[int(self.width*resize),int(self.height*resize)]
self.roi_y1,self.roi_y2,self.roi_x1,self.roi_x2=[self.resized_height //3,self.resized_height *2//3, 0,self.resized_width-10]
self.roi_width,self.roi_height=(self.roi_x2-self.roi_x1), (self.roi_y2-self.roi_y1)
print(self.roi_x2,self.roi_x1,self.roi_y2,self.roi_y1)
# setup hsv threshold
self.lower_blue = np.array([60, 40, 100])
self.upper_blue = np.array([120, 255, 255])
self.lower_red = np.array([0, 70, 120])
self.upper_red = np.array([179, 255, 255])
# setup variable to determine how many times stoped
self.detected_cnt=0
self.previous_detected_time=0
# initialize camera
self.initialize_camera()
self.frame_count=0
def initialize_camera(self):
"""Configure camera source based on platform"""
if platform.system() == 'Windows':
self.capture = cv2.VideoCapture(0, cv2.CAP_DSHOW)
else:
self.capture = cv2.VideoCapture(0)
self.capture.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'MJPG'))
if not self.capture.isOpened():
raise RuntimeError("Failed to initialize video source")
# setup capture size
self.capture.set(cv2.CAP_PROP_FRAME_WIDTH, self.width)
self.capture.set(cv2.CAP_PROP_FRAME_HEIGHT, self.height)
def __del__(self):
"""Release camera resources"""
self.capture.release()
def process_frame(self):
"""Main image processing pipeline"""
ret, frame = self.capture.read()
self.frame_count+=1
# print("frame_count=",self.frame_count)
if not ret or self.frame_count==400:
print("Frame is lost or Frame count limit is met\n")
exit_event.set()
return None
"""Image preprocessing and ROI extraction"""
self.resized_img = cv2.resize(frame, (self.resized_width, self.resized_height)) # resize the img
self.roi = self.resized_img[self.roi_y1:self.roi_y2,self.roi_x1:self.roi_x2] # get roi
self.hsv_img = cv2.cvtColor(self.roi, cv2.COLOR_BGR2HSV) # convert to HSV
self.queue_display("resized_img", self.resized_img)
# Line detection
deviation=self.detect_lane_lines()
# Stop sign detection
self.stop_sign_detection()
return deviation
def detect_lane_lines(self):
"""Lane line detection"""
# get blue region
self.blue_mask = cv2.inRange(self.hsv_img, self.lower_blue,self.upper_blue)
self.blue_mask = cv2.morphologyEx(self.blue_mask, cv2.MORPH_OPEN, np.ones((5,5), np.uint8))
# Iterate over each pixel
vote1_x_left=0
vote1_i_left=0
vote1_x_right=0
vote1_i_right=0
for y in range(self.roi_height-3,self.roi_height):
for x in range(self.roi_width):
pixel_value = self.blue_mask[y, x]
if pixel_value == 255: # white
if x<self.roi_width//2: # left line
vote1_x_left+=x
vote1_i_left+=1
else: # right line
vote1_x_right+=x
vote1_i_right+=1
# print(vote1_i_left,vote1_i_right)
if vote1_i_left>30 and vote1_i_right >30: # see both lines
target_x=(vote1_x_left//vote1_i_left+vote1_x_right//vote1_i_right)//2
# print("[both]target_x",target_x)
elif vote1_i_left<=30 and vote1_i_right>0: # only see the right line
target_x=vote1_x_right//vote1_i_right-self.roi_width//4
# print("[right]target_x",target_x)
if target_x < 0:
target_x=0
elif vote1_i_right<=30 and vote1_i_left>0: # only see the left line
target_x=vote1_x_left//vote1_i_left+self.roi_width//4
# print("[left]target_x",target_x)
if target_x > self.roi_width:
target_x=self.roi_width-1
else: # see neither line
target_x=self.roi_width//2
# draw the target
cv2.line(self.roi, (target_x,self.roi_height), (target_x,self.roi_height), (0,0,255),8)
self.queue_display("blue_mask", self.blue_mask)
self.queue_display("roi_target", self.roi)
return target_x-self.roi_width//2
def stop_sign_detection(self):
# lower, upper = self.read_parameters()
# further resize the frame to accelerate speed
red_hsv_img=cv2.resize(self.hsv_img, (self.roi_width//5,self.roi_height//5))
resize_blue_mask=cv2.resize(self.blue_mask, (self.roi_width//5,self.roi_height//5))
# get red region
red_mask = cv2.inRange(red_hsv_img, self.lower_red,self.upper_red)
red_mask = cv2.morphologyEx(red_mask, cv2.MORPH_OPEN, np.ones((3,3), np.uint8))
diffImg = cv2.subtract(red_mask,resize_blue_mask)
# self.queue_display("red_mask", red_mask)
self.queue_display("diffImg", diffImg)
vote1_i_red1=0
vote1_i_red2=0
for x in range(self.roi_width//5):
pixel_value1 = diffImg[self.roi_height//5-1, x]
pixel_value2 = diffImg[0, x]
if pixel_value1 == 255: # white
vote1_i_red1+=1
if pixel_value2 == 255: # white
vote1_i_red2+=1
global STOP_FLAG
STOP_FLAG=0
if vote1_i_red1>6 and vote1_i_red2<6: # stop sign detected
current_time = int(time.time())
# print(f"current_time {current_time}")
if current_time-self.previous_detected_time>7: # make sure not to detect the same stop sign
print("stop sign path")
self.detected_cnt+=1
self.previous_detected_time=current_time
if self.detected_cnt==2: # permanent stop
STOP_FLAG=2
else:
STOP_FLAG=1
def read_parameters(self):
"""Thread-safe parameter reading from trackbars"""
h_low = cv2.getTrackbarPos("H Low", "Trackbars")
s_low = cv2.getTrackbarPos("S Low", "Trackbars")
v_low = cv2.getTrackbarPos("V Low", "Trackbars")
h_high = cv2.getTrackbarPos("H High", "Trackbars")
s_high = cv2.getTrackbarPos("S High", "Trackbars")
v_high = cv2.getTrackbarPos("V High", "Trackbars")
lower = np.array([h_low, s_low, v_low])
upper = np.array([h_high, s_high, v_high])
return lower , upper
def queue_display(self, name, frame):
"""Thread-safe display queue update"""
with display_lock:
display_queue.append((name, frame))
def run_camera_scheduler(camera):
"""Main processing thread workflow"""
while not exit_event.is_set():
deviation=camera.process_frame()
if deviation is None:
break
servo_duty = int((0.075 + deviation*0.0005) * PERIOD)
pwmset(0, "duty_cycle", servo_duty)
plot_waveform.plot_err_vals.append(deviation/100)
plot_waveform.plot_steer_pwm.append(servo_duty/PERIOD*100)
# print(f"Steering deviation: {deviation} pixels,{servo_duty/PERIOD} %duty")
time.sleep(0.1)
if __name__ == "__main__":
disp_thread = DisplayThread()
disp_thread.start()
while not disp_thread.windows_created:
time.sleep(0.1)
try:
camera = CameraAnalysis()
proc_thread = Thread(target=run_camera_scheduler, args=(camera,))
proc_thread.start()
while not exit_event.is_set():
time.sleep(1)
except KeyboardInterrupt:
exit_event.set()
finally:
exit_event.set()
# proc_thread.join()
# disp_thread.join()
# del camer
// This file has code taken from and influenced by the following sources:
// User raja_961, Autonomous Lane-Keeping Car Using Raspberry
// Pi and OpenCV. Instructables. URL:
// https://www.instructables.com/Autonomous-Lane-Keeping-Car-U
// sing-Raspberry-Pi-and
// other reference
// https://www.hackster.io/big-brains/big-brains-elec-424-final-project-team-11-53c862
// https://www.hackster.io/m-e-g-g/the-magnificent-m-e-g-g-car-28ec89
#include <linux/module.h>
#include <linux/of_device.h>
#include <linux/kernel.h>
#include <linux/gpio/consumer.h>
#include <linux/platform_device.h>
// timer and interrupt header files
#include <linux/ktime.h>
#include <linux/hrtimer.h>
#include <linux/interrupt.h>
// Timer variables
ktime_t prev_time; // the time of first detection
ktime_t curr_time; // the time of second detection
s64 delta_time; // the duration between two detections
// the value to be read from main
static int encoder_val;
module_param(encoder_val, int, 0644);
/** variable contains pin number o interrupt controller to which GPIO 17 is mapped to */
unsigned int irq_number;
// variables related to encoder, which will be used in the gpiod function
struct gpio_desc *encoder;
// Interrupt service routine is called, when interrupt is triggered
static irqreturn_t gpio_irq_handler(int irq, void *dev_id)
{
// debounce or first detection
curr_time=ktime_get_ns();
delta_time=curr_time-prev_time;
if (delta_time<300000){ // less than 1ms -> debounce
// printk("Debounce: delta_time: %lld ns\n",delta_time);
;
}else{ // more than 1ms -> first detection
prev_time=curr_time; // record the time of first detection
encoder_val=delta_time;
printk("delta_time: %lld ns\n",delta_time);
}
return IRQ_HANDLED;
}
// probe function
static int encoder_probe(struct platform_device *pdev)
{
printk("Enter encoder probe\n");
// init encoder GPIO
encoder=devm_gpiod_get(&pdev->dev, "encoder", GPIOD_IN);
if(IS_ERR(encoder)) {
printk("devm_gpiod_get failed\n");
return -1;
}
// Setup the interrupt
irq_number = gpiod_to_irq(encoder);
if(request_irq(irq_number, gpio_irq_handler, IRQF_TRIGGER_FALLING, "my_gpio_irq", NULL) != 0){
printk("Error!\nCan not request interrupt nr.: %d\n", irq_number);
return -1;
}
printk("GPIO 17 is mapped to IRQ Nr.: %d\n", irq_number);
prev_time = ktime_get_ns();
return 0;
}
// remove function
static int encoder_remove(struct platform_device *pdev)
{
// Free the irq and print a message
free_irq(irq_number, NULL);
printk("encoder_remove\n");
return 0;
}
static struct of_device_id matchy_match[] = {
{.compatible="mycompany,myencoder",},
{/* leave alone - keep this here (end node) */},
};
// platform driver object
static struct platform_driver adam_driver = {
.probe = encoder_probe,
.remove = encoder_remove,
.driver = {
.name = "The Rock: this name doesn't even matter",
.owner = THIS_MODULE,
.of_match_table = matchy_match,
},
};
module_platform_driver(adam_driver);
MODULE_DESCRIPTION("Final Project");
MODULE_AUTHOR("gj20");
MODULE_LICENSE("GPL v2");
MODULE_ALIAS("platform:adam_driver");
/dts-v1/;
/plugin/;
/ {
fragment@0 {
target-path = "/";
__overlay__ {
my_encoder_device {
compatible = "mycompany,myencoder";
encoder-gpios = <&gpio 17 0>;
};
};
};
};
import cv2
import numpy as np
import platform
import time
import threading
from threading import Thread
# from hardware_pwm import *
# Global synchronization primitives
exit_event = threading.Event()
display_queue = []
display_lock = threading.Lock()
class DisplayThread(threading.Thread):
"""Handles all GUI operations and visualization"""
def __init__(self):
super().__init__()
self.windows_created = False
def run(self):
"""Main display loop with all visualization windows"""
cv2.namedWindow("Trackbars", cv2.WINDOW_NORMAL)
cv2.resizeWindow("Trackbars", 500, 600)
self.create_trackbars()
# Create all visualization windows
cv2.namedWindow("ROI", cv2.WINDOW_NORMAL)
cv2.namedWindow("blue_mask", cv2.WINDOW_NORMAL)
# cv2.namedWindow("canny", cv2.WINDOW_NORMAL)
# cv2.namedWindow("fitline", cv2.WINDOW_NORMAL)
cv2.namedWindow("target", cv2.WINDOW_NORMAL)
self.windows_created = True
while not exit_event.is_set():
with display_lock:
if display_queue:
for name, img in display_queue:
cv2.imshow(name, img)
display_queue.clear()
if cv2.waitKey(25) & 0xFF == ord('q'):
exit_event.set()
time.sleep(0.02)
cv2.destroyAllWindows()
def create_trackbars(self):
"""Initialize parameter adjustment interface"""
cv2.createTrackbar("H Low", "Trackbars", 60, 179, lambda x: None)
cv2.createTrackbar("S Low", "Trackbars", 40, 255, lambda x: None)
cv2.createTrackbar("V Low", "Trackbars", 100, 255, lambda x: None)
cv2.createTrackbar("H High", "Trackbars", 120, 179, lambda x: None)
cv2.createTrackbar("S High", "Trackbars", 255, 255, lambda x: None)
cv2.createTrackbar("V High", "Trackbars", 255, 255, lambda x: None)
cv2.createTrackbar("threshold", "Trackbars", 60, 200, lambda x: None)
cv2.createTrackbar("minLineLength", "Trackbars", 10, 100, lambda x: None)
cv2.createTrackbar("maxLineGap", "Trackbars", 100, 100, lambda x: None)
class CameraAnalysis:
"""Main processing class for lane detection and analysis"""
def __init__(self):
# setup frame params
resize= 0.2
self.width,self.height = [1280, 720]
self.resized_width, self.resized_height=[int(self.width*resize),int(self.height*resize)]
self.roi_y1,self.roi_y2,self.roi_x1,self.roi_x2=[self.height//3,self.height*2//3, 0,self.resized_width]
self.roi_width,self.roi_height=(self.roi_x2-self.roi_x1), (self.roi_y2-self.roi_y1)
# warpPerspective Transformation
# pts1 = np.float32([[35, 480], [240, 0], [580, 480], [370, 0]]) # original frame
# pts2 = np.float32([[35, 480], [35 , 0], [580, 480], [580, 0]]) # transformed frame
# self.PTM = cv2.getPerspectiveTransform(pts1, pts2) # calculate the warpPerspective Transformation matrix
# initialize camera
self.initialize_camera()
def initialize_camera(self):
"""Configure camera source based on platform"""
if platform.system() == 'Windows':
self.capture = cv2.VideoCapture(0, cv2.CAP_DSHOW)
else:
self.capture = cv2.VideoCapture(0)
self.capture.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'MJPG'))
if not self.capture.isOpened():
raise RuntimeError("Failed to initialize video source")
self.capture.set(cv2.CAP_PROP_FRAME_WIDTH, self.width)
self.capture.set(cv2.CAP_PROP_FRAME_HEIGHT, self.height)
def __del__(self):
"""Release camera resources"""
self.capture.release()
def process_frame(self):
"""Main image processing pipeline"""
ret, frame = self.capture.read()
if not ret:
exit_event.set()
return None
"""Image preprocessing and ROI extraction"""
self.resized_img = cv2.resize(frame, (self.resized_width, self.resized_height)) # resize the img
self.roi = self.resized_img[self.roi_y1:self.roi_y2,self.roi_x1:self.roi_x2] # get roi
# PTM_img = cv2.warpPerspective(self.roi, self.PTM, (self.roi_width, self.roi_height)) # warpPerspective Transformation
self.hsv_img = cv2.cvtColor(self.roi, cv2.COLOR_BGR2HSV) # convert to HSV
self.queue_display("resized_img", self.resized_img)
# self.queue_display("PTM_frame", PTM_img)
self.queue_display("ROI", self.roi)
# Line detection
deviation=self.analyze_lanes()
# Stop sign detection
return deviation
def analyze_lanes(self):
"""Complete lane analysis workflow"""
deviation = self.detect_lane_lines()
# left_center = self.fit_line(left_points)
# right_center = self.fit_line(right_points)
# return self.calculate_deviation(left_center, right_center)
return deviation
def detect_lane_lines(self):
"""Lane line detection using color thresholding and Hough transform"""
lower, upper, threshold, min_len, max_gap = self.read_parameters()
mask = cv2.inRange(self.hsv_img, lower, upper)
mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, np.ones((5,5), np.uint8))
#
vote1_x_left=0
vote1_i_left=0
vote1_x_right=0
vote1_i_right=0
for y in range(self.roi_height-3,self.roi_height):
for x in range(self.roi_width):
pixel_value = mask[y, x]
if pixel_value == 255: # white
if x<self.roi_width//2:
vote1_x_left+=x
vote1_i_left+=1
else:
vote1_x_right+=x
vote1_i_right+=1
print(vote1_i_left,vote1_i_right)
if vote1_i_left>100 and vote1_i_right >100:
target_x=(vote1_x_left//vote1_i_left+vote1_x_right//vote1_i_right)//2
print("[both]target_x",target_x)
elif vote1_i_left<=100 and vote1_i_right >0: # only see the right line
target_x=vote1_x_right//vote1_i_right-self.roi_width//4
print("[right]target_x",target_x)
if target_x < 0:
target_x=0
elif vote1_i_right<=100 and vote1_i_left>0: # only see the left line
target_x=vote1_x_left//vote1_i_left+self.roi_width//4
print("[left]target_x",target_x)
if target_x > self.roi_width:
target_x=self.roi_width-1
else:
target_x=self.roi_width//2
cv2.line(self.roi, (target_x,self.roi_height), (target_x,self.roi_height), (0,0,255),10)
# edges = cv2.Canny(mask, 150, 180)
# lines = cv2.HoughLinesP(edges, 1, np.pi/180, threshold,
# minLineLength=min_len, maxLineGap=max_gap)
self.queue_display("blue_mask", mask)
self.queue_display("roi_withline", self.roi)
# self.queue_display("canny", edges)
# return self.classify_points(lines)
return target_x-self.roi_width//2
def fit_line(self, points):
"""Line fitting"""
if not points or len(points) < 2:
return None
points = np.array(points).reshape(-1, 2)
vx, vy, x0, y0 = cv2.fitLine(points, cv2.DIST_L2, 0, 0.01, 0.01).flatten()
if abs(vx) < 0.001:
cv2.line(self.roi, (int(x0),0), (int(x0),self.height), (0,0,255),2)
center = int(x0)
else:
y1 = int(y0 - x0 * vy / vx)
y2 = int(y0 + (self.width - x0) * vy / vx)
cv2.line(self.roi, (0,y1), (self.width,y2), (0,0,255),2)
if abs(vy) < 0.001:
return self.width//2
center = int(x0 - y0 * vx / vy)
# self.queue_display("fitline", self.roi)
return center
def calculate_deviation(self, left, right):
"""Calculate steering deviation from lane center"""
if left is None or right is None:
return 0
center = (left + right) // 2
cv2.line(self.roi, (self.width//2, self.height//3+5),
(center,0), (255,255,0),2)
cv2.line(self.roi, (self.width//2, self.height//3+5),
(self.width//2,0), (255,0,255),2)
self.queue_display("target", self.roi)
return center - self.width//2
def classify_points(self, lines):
"""Classify detected points into left/right lanes"""
left, right = [], []
if lines is not None:
for line in lines:
x1, y1, x2, y2 = line[0]
if x1 < self.width//2:
left.extend([[x1,y1], [x2,y2]])
else:
right.extend([[x1,y1], [x2,y2]])
cv2.line(self.roi, (x1,y1), (x2,y2), (0,255,0),2)
return left, right
def read_parameters(self):
"""Thread-safe parameter reading from trackbars"""
h_low = cv2.getTrackbarPos("H Low", "Trackbars")
s_low = cv2.getTrackbarPos("S Low", "Trackbars")
v_low = cv2.getTrackbarPos("V Low", "Trackbars")
h_high = cv2.getTrackbarPos("H High", "Trackbars")
s_high = cv2.getTrackbarPos("S High", "Trackbars")
v_high = cv2.getTrackbarPos("V High", "Trackbars")
threshold = cv2.getTrackbarPos("threshold", "Trackbars")
minLineLength = cv2.getTrackbarPos("minLineLength", "Trackbars")
maxLineGap = cv2.getTrackbarPos("maxLineGap", "Trackbars")
lower = np.array([h_low, s_low, v_low])
upper = np.array([h_high, s_high, v_high])
return lower , upper , threshold, minLineLength, maxLineGap
def queue_display(self, name, frame):
"""Thread-safe display queue update"""
with display_lock:
display_queue.append((name, frame))
def run_camera_scheduler(camera):
"""Main processing thread workflow"""
while not exit_event.is_set():
deviation=camera.process_frame()
if deviation is None:
break
print(f"Steering deviation: {deviation} pixels")
time.sleep(0.1)
if __name__ == "__main__":
disp_thread = DisplayThread()
disp_thread.start()
while not disp_thread.windows_created:
time.sleep(0.1)
try:
camera = CameraAnalysis()
proc_thread = Thread(target=run_camera_scheduler, args=(camera,))
proc_thread.start()
while not exit_event.is_set():
time.sleep(1)
except KeyboardInterrupt:
exit_event.set()
finally:
exit_event.set()
# proc_thread.join()
# disp_thread.join()
# del camer
Comments