import cv2
import numpy as np
import math
import time
from gpiozero import PWMOutputDevice
import matplotlib.pyplot as plt
import signal
import sys
encoder_path = "/sys/module/gpiod_driver/parameters/encoder_interval_ns"
encoder_values = []
# smaller value corresponds to faster rotation
max_rotate = 5000000
min_rotate = 8500000
delta = 0.001
frame_numbers = []
errors = []
steering_pwms = []
speed_pwms = []
proportional_responses = []
derivative_responses = []
motor_pwm = PWMOutputDevice(19, frequency = 50)
steering_pwm = PWMOutputDevice(18, frequency = 50)
# Signal handler function to gracefully stop the program and plot collected data
def signal_handler(sig, frame):
print("\n[INFO] Ctrl+C detected. Plotting data and exiting...")
# Set motor and steering PWM values to neutral to stop the car
motor_pwm.value = .075
steering_pwm.value = .075
# Plot the proportional and derivative responses with error
plot_propdir(proportional_responses, derivative_responses, errors)
# Plot the PWM signals for speed and steering, along with error
plot_pwm(speed_pwms, steering_pwms, errors)
# Exit the program cleanly
sys.exit(0)
# Register the above signal handler for Ctrl+C (SIGINT)
signal.signal(signal.SIGINT, signal_handler)
# Function to plot proportional and derivative control values along with error
def plot_propdir(p_vals, d_vals, error):
# Create a figure with a primary y-axis and a secondary y-axis
fig, ax1 = plt.subplots()
t_ax = np.arange(len(p_vals)) # X-axis: time/frame index
# Plot proportional (P) and derivative (D) values on primary y-axis
ax1.plot(t_ax, p_vals, '-', label="P-vals")
ax1.plot(t_ax, d_vals, '-', label="D-vals")
# Secondary y-axis for error values
ax2 = ax1.twinx()
ax2.plot(t_ax, error, '--r', label="Error") # Dashed red line for error
ax1.set_xlabel("Frames")
ax1.set_ylabel("PD Value")
ax2.set_ylim(-90, 90)
ax2.set_ylabel("Error value")
# Add title and legend, adjust layout and save plot to file
plt.title("PD Values over time")
fig.legend()
fig.tight_layout()
plt.savefig("/home/pi/test_pd_plot.png")
plt.clf() # Clear the figure to free memory
return
def plot_pwm(speed_pwms, turn_pwms, error):
# Create a figure with dual y-axes
fig, ax1 = plt.subplots()
t_ax = np.arange(len(speed_pwms)) # X-axis: time/frame index
# Plot speed and steering PWM values on primary y-axis
ax1.plot(t_ax, speed_pwms, '-', label="Speed")
ax1.plot(t_ax, turn_pwms, '-', label="Steering")
# Normalize and plot error on the same primary y-axis for visual comparison
ax1.plot(t_ax, error / np.max(error), '--r', label="Error")
ax1.set_xlabel("Frames")
ax1.set_ylabel("Speed and Steer Duty Cycle")
# Define secondary axis
ax2 = ax1.twinx()
ax2.set_ylabel("% Error Value") # Optional secondary label
plt.title("Speed + Steer Duty Cycle + error vs time")
fig.legend()
plt.savefig("/home/pi/test_change_steer_plot.png")
plt.clf() # Clear the figure after saving
return
def convert_to_HSV(frame):
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
# cv2.imshow("HSV", hsv)
return hsv
def detect_edges(frame):
# lowered the threshold
lower_blue = np.array([85, 50, 50], dtype="uint8")
upper_blue = np.array([140, 255, 255], dtype="uint8")
# this mask will filter out everything but blue
mask = cv2.inRange(hsv, lower_blue, upper_blue)
# detect edges
edges = cv2.Canny(mask, 50, 100)
#cv2.imshow("edges", edges)
return edges
def detect_red(frame):
# low_red = np.array([161, 155, 84])
# high_red = np.array([179, 255, 255])
# red_mask = cv2.inRange(frame, low_red, high_red)
# lower mask (0-10)
lower_red = np.array([0, 50, 50])
upper_red = np.array([10, 255, 255])
mask0 = cv2.inRange(frame, lower_red, upper_red)
# upper mask (170-180)
lower_red = np.array([170, 50, 50])
upper_red = np.array([180, 255, 255])
mask1 = cv2.inRange(frame, lower_red, upper_red)
# join my masks
red_mask = cv2.bitwise_or(mask0, mask1)
# red = cv2.bitwise_and(frame, frame, mask = red_mask)
countRed = np.count_nonzero(red_mask)
# if countRed > 0:
# print("Red value: ",countRed)
if countRed > 10:
return True
else:
return False
def detect_green(frame):
low_green = np.array([25, 52, 72])
high_green = np.array([102, 255, 255])
green_mask = cv2.inRange(frame, low_green, high_green)
# red = cv2.bitwise_and(frame, frame, mask = red_mask)
countGreen = np.count_nonzero(green_mask)
# print("Green value: ",countGreen)
if countGreen > 110:
return True
else:
return False
def manage_speed():
f = open(encoder_path, "r")
encode_val = int(f.readline())
f.close()
encoder_values.append(encode_val)
print("Calculated speed")
print(encode_val)
if encode_val <= max_rotate:
print("reduce speed")
ret_val = -delta
elif encode_val >= min_rotate:
print("increase speed")
ret_val = delta
else:
print("maintain speed")
ret_val = 0 # Default if within range
return ret_val
def region_of_interest(edges):
height, width = edges.shape # extract the height and width of the edges frame
# make an empty matrix with same dimensions of the edges frame
mask = np.zeros_like(edges)
# only focus lower half of the screen
# specify the coordinates of 4 points (lower left, upper left, upper right, lower right)
polygon = np.array([[
(0, height),
(0, height/2),
(width, height/2),
(width, height),
]], np.int32)
cv2.fillPoly(mask, polygon, 255) # fill the polygon with blue color
cropped_edges = cv2.bitwise_and(edges, mask)
# cv2.imshow("roi", cropped_edges)
return cropped_edges
def detect_line_segments(cropped_edges):
rho = 1
theta = np.pi / 180
min_threshold = 10
line_segments = cv2.HoughLinesP(cropped_edges, rho, theta, min_threshold,
np.array([]), minLineLength=5, maxLineGap=0)
return line_segments
def make_points(frame, line):
height, width, _ = frame.shape
slope, intercept = line
y1 = height # bottom of the frame
y2 = int(y1 / 2) # make points from middle of the frame down
if slope == 0:
slope = 0.1
x1 = int((y1 - intercept) / slope)
x2 = int((y2 - intercept) / slope)
return [[x1, y1, x2, y2]]
def average_slope_intercept(frame, line_segments):
lane_lines = []
if line_segments is None:
# print("no line segment detected")
return lane_lines
height, width, _ = frame.shape
left_fit = []
right_fit = []
boundary = 1/3
left_region_boundary = width * (1 - boundary)
right_region_boundary = width * boundary
for line_segment in line_segments:
for x1, y1, x2, y2 in line_segment:
if x1 == x2:
# print("skipping vertical lines (slope = infinity)")
continue
fit = np.polyfit((x1, x2), (y1, y2), 1)
slope = (y2 - y1) / (x2 - x1)
intercept = y1 - (slope * x1)
if slope < 0:
if x1 < left_region_boundary and x2 < left_region_boundary:
left_fit.append((slope, intercept))
else:
if x1 > right_region_boundary and x2 > right_region_boundary:
right_fit.append((slope, intercept))
left_fit_average = np.average(left_fit, axis=0)
if len(left_fit) > 0:
lane_lines.append(make_points(frame, left_fit_average))
right_fit_average = np.average(right_fit, axis=0)
if len(right_fit) > 0:
lane_lines.append(make_points(frame, right_fit_average))
# lane_lines is a 2-D array consisting the coordinates of the right and left lane lines
# for example: lane_lines = [[x1,y1,x2,y2],[x1,y1,x2,y2]]
# where the left array is for left lane and the right array is for right lane
# all coordinate points are in pixels
return lane_lines
def display_lines(frame, lines, line_color=(0, 255, 0), line_width=6): # line color (B,G,R)
line_image = np.zeros_like(frame)
if lines is not None:
for line in lines:
for x1, y1, x2, y2 in line:
cv2.line(line_image, (x1, y1), (x2, y2),
line_color, line_width)
line_image = cv2.addWeighted(frame, 0.8, line_image, 1, 1)
return line_image
def get_steering_angle(frame, lane_lines):
height, width, _ = frame.shape
if len(lane_lines) == 2: # if two lane lines are detected
# extract left x2 from lane_lines array
_, _, left_x2, _ = lane_lines[0][0]
# extract right x2 from lane_lines array
_, _, right_x2, _ = lane_lines[1][0]
mid = int(width / 2)
x_offset = (left_x2 + right_x2) / 2 - mid
y_offset = int(height / 2)
print("both line")
elif len(lane_lines) == 1: # if only one line is detected
x1, _, x2, _ = lane_lines[0][0]
x_offset = x2 - x1
y_offset = int(height / 2)
print("1 line")
elif len(lane_lines) == 0: # if no line is detected
x_offset = 0
y_offset = int(height / 2)
print("No line detected")
angle_to_mid_radian = math.atan(x_offset / y_offset)
angle_to_mid_deg = int(angle_to_mid_radian * 180.0 / math.pi)
steering_angle = angle_to_mid_deg + 90
return steering_angle
def display_heading_line(frame, steering_angle, line_color=(0, 0, 255), line_width=5):
heading_image = np.zeros_like(frame)
height, width, _ = frame.shape
steering_angle_radian = steering_angle / 180.0 * math.pi
x1 = int(width / 2)
y1 = height
x2 = int(x1 - height / 2 / math.tan(steering_angle_radian))
y2 = int(height / 2)
cv2.line(heading_image, (x1, y1), (x2, y2), line_color, line_width)
heading_image = cv2.addWeighted(frame, 0.8, heading_image, 1, 1)
return heading_image
def set_duty_cycle(device, duty_cycle_percent):
"""Set PWM device value based on duty cycle percentage."""
device.value = duty_cycle_percent / 100
def display_heading_line(frame, steering_angle, line_color=(0, 0, 255), line_width=5):
heading_image = np.zeros_like(frame)
height, width, _ = frame.shape
steering_angle_radian = steering_angle / 180.0 * math.pi
x1 = int(width / 2)
y1 = height
x2 = int(x1 - height / 2 / math.tan(steering_angle_radian))
y2 = int(height / 2)
cv2.line(heading_image, (x1, y1), (x2, y2), line_color, line_width)
heading_image = cv2.addWeighted(frame, 0.8, heading_image, 1, 1)
return heading_image
# Variables to be updated each loop
lastTime = 0
lastError = 0
# PWM pin
neutral_speed_pwm = 0.075
# Car only moves when at .08 threshold.
base_speed_pwm = neutral_speed_pwm + 0.005
def get_pid_control(deviation, last_error, integral, last_time):
# Current time
current_time = time.time()
delta_time = current_time - last_time
# Proportional term
P = Kp * deviation
# Integral term (accumulating error over time)
integral += deviation * delta_time
I = Ki * integral
# Derivative term (change of error over time)
D = Kd * (deviation - last_error) / delta_time if delta_time > 0 else 0
# Compute PID output
pid_output = P + I + D
# Return new PID values
return pid_output, last_error, integral, current_time
if __name__ == "__main__":
video = cv2.VideoCapture(0)
video.set(cv2.CAP_PROP_FRAME_WIDTH, 320)
video.set(cv2.CAP_PROP_FRAME_HEIGHT, 240)
# Variables to be updated each loop
last_time = time.time() # Last time the PID control was updated
last_error = 0 # Last deviation
integral = 0 # Integral of error
rotbase = 7.5
lastTime = 0
lastError = 0
j= 0
# PID constants (Adjusted for noisy input)
Kp = 0.1 # Lower proportional constant to avoid overshoot due to noise
Ki = 0.02 # Small integral constant to prevent drift without amplifying noise
Kd = 0.6 # Higher derivative constant to better dampen the effect of noise and rapid fluctuations
#Adding data points
der_r = np.array([])
pro_r = np.array([])
err = np.array([])
steering = np.array([])
speed = np.array([])
curr_speed = .075
# === ESC Calibration ===
print("Calibrating ESC...")
set_duty_cycle(motor_pwm, 10) # Max motor_pwm
time.sleep(2)
set_duty_cycle(motor_pwm, 5) # Mid motor_pwm
time.sleep(2)
set_duty_cycle(motor_pwm, 7.5) # Neutral
time.sleep(2)
#stop_motors()
time.sleep(1)
frame_no = -1
# PI constants
kpr=5
kdr=8
counter = 0
try:
while True:
frame_no += 1
ret, frame = video.read()
frame = cv2.resize(frame, (150, 100), interpolation=cv2.INTER_CUBIC)
hsv = convert_to_HSV(frame)
edges = detect_edges(hsv)
roi = region_of_interest(edges)
line_segments = detect_line_segments(roi)
lane_lines = average_slope_intercept(frame, line_segments)
lane_line_image = display_lines(frame, lane_lines)
steering_angle = get_steering_angle(frame, lane_lines)
deviation = steering_angle - 90 # Calculate the deviation from the center (90 degrees)
errors.append(deviation)
frame_numbers.append(frame_no)
# PD control
now = time.time()
dovert = now - lastTime if lastTime > 0 else 0.01 # Avoid division by zero
dv_r = kdr * (deviation - lastError) / dovert
pr_r = kpr * deviation
# Store responses for plotting
proportional_responses.append(pr_r)
derivative_responses.append(dv_r)
# Save steering and speed for plotting
steering_pwms.append(steering_pwm.value * 100) # Convert back to % for readability
speed_pwms.append(motor_pwm.value * 100)
# Apply PID control to smooth the steering
pid_output, last_error, integral, last_time = get_pid_control(deviation, last_error, integral, last_time)
# Steering control logic using the PID output
# print(f"Steering Angle: {steering_angle}, Deviation: {deviation}, PID Output: {pid_output}")
cv2.imshow('Edges', edges)
final_output = display_heading_line(lane_line_image, steering_angle)
cv2.imshow("Lane Detection Output", final_output)
#deviation calculation for P and D
now = time.time()
dt = now - lastTime
deviation = steering_angle - 90
counter = counter + 1
if deviation < 3 and deviation > -3: #Min margin before turning
#Find new derivative and proportional gain
deviation = 0
derivative_r = kdr * (deviation - lastError) / dt
proportional_r = kpr * deviation
PD_r = derivative_r + proportional_r
rot = rotbase - PD_r/90
steering_pwm.value = 7.5/100
# print("rot is")
# print(rot)
if counter % 3 == 0:
temp_speed = manage_speed() + curr_speed
print('Temp speed')
print(temp_speed)
if temp_speed != curr_speed:
if temp_speed >= .088:
temp_speed = .088
motor_pwm.value = .088
elif temp_speed <= .075:
temp_speed = .075
motor_pwm.value = .075
else:
motor_pwm.value = temp_speed
curr_speed = temp_speed
motor_pwm.value = curr_speed
else:
#Find new derivative and proportional gain
derivative_r = kdr * (deviation - lastError) / dt
proportional_r = kpr * deviation
PD_r = derivative_r + proportional_r
rot = rotbase + PD_r/90
# print("rot is")
# print(rot)
#Making sure rot stay in the correct duty cycle range.
if rot>9.5:
rot = 9.5
elif rot<5.5:
rot = 5.5
steering_pwm.value = rot/100
if counter % 3 == 0:
temp_speed = manage_speed() + curr_speed
print('Temp speed')
print(temp_speed)
if temp_speed != curr_speed:
print("Speed mismatch")
if temp_speed >= .088:
temp_speed = .088
motor_pwm.value = .088
elif temp_speed <= .075:
temp_speed = .075
motor_pwm.value = .075
else:
motor_pwm.value = temp_speed
curr_speed = temp_speed
print("curr_speed")
print(curr_speed)
motor_pwm.value = curr_speed
key = cv2.waitKey(1)
time.sleep(.025)
if key == 27: # ESC key pressed
# plot_encoder_values()
motor_pwm.value = .075
steering_pwm.value = .075
plot_propdir(proportional_responses, derivative_responses, errors)
plot_pwm(speed_pwms, steering_pwms, errors)
break
finally:
# plot_control_vs_frame()
# plot_pid_components()
video.release()
cv2.destroyAllWindows()
Comments