Valeria Itrat HerreraSugiarto Wibowomehul4373Ashley Garcia
Published

The-Owl-tonomous-Aviators

The Owl-tonomous car uses vision and PID to stay on track and glide through curves.

AdvancedFull instructions providedOver 4 days54
The-Owl-tonomous-Aviators

Things used in this project

Hardware components

Raspberry Pi 4 Model B
Raspberry Pi 4 Model B
×1
RC car chasis
×1
Electronic Speed Controller
×1
Optical speed encoder
×1
Servo Motor
×1
Webcam, Logitech® HD Pro
Webcam, Logitech® HD Pro
×1
ESC Battery
×1
Portable Charger
×1

Software apps and online services

OpenCV
OpenCV

Hand tools and fabrication machines

3D printed camera mount
3D printed Raspberry Pi case

Story

Read more

Custom parts and enclosures

Camera Mount

Schematics

Schematics

Code

main.py

Python
"""
@Description: 
    This project was developed as the final project for ELEC 553. The objective was to design and implement an autonomous vehicle capable of navigating a predefined track using computer vision and control techniques.
@Contributor: 
    Ashley Garcia, Mehul Goel, Sugiarto Wibowo, Valeria Itrat Herrera
"""

import time
import cv2
from motor import MotorController
from encoder import MotorEncoder
from poly_lane2 import LaneTracker as PolyLaneTracker

if __name__ == "__main__":
    motor_ctrl = MotorController()
    motor_enc = MotorEncoder()
    poly_lane_detect = PolyLaneTracker()

    count_frame = 0
    red_cnt = 0
    red_flag = False

    prev_steering = 90
    pred_alpha = 0.3

    red_box_thr = 2500

    try:
        while True:
            # Process the land detection and robot steering angle
            time.sleep(0.02)
            actual_steering, red, img = poly_lane_detect.get_lane_data()
            count_frame += 1
            
            if img is not None:
                cv2.imshow("Perfected View", img)
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break

            # Box red detection
            if count_frame % 3 == 0:
                count_frame = 0
                if red is not None and red > red_box_thr and not red_flag:
                    red_cnt += 1
                    red_flag = True

                    print(f"Red Box Detected! (Size: {red}). Stopping the car! - Box count {red_cnt}")

                    if red_cnt == 1:
                        print("First stop detected!")
                        motor_ctrl.set_speed(0)
                        motor_ctrl.set_steering_pid(7.5)
                        time.sleep(5)
                        motor_ctrl.set_speed(8.1)
                        time.sleep(2)
                    elif red_cnt == 2:
                        print("Second stop detected!")
                        motor_ctrl.stop()
                        break

                if (red is None or red <= red_box_thr) and red_flag:
                    print("Red box disappeared, ready for next detection")
                    red_flag = False

            # Steering angle calculation and update
            if actual_steering is not None:
                delta = actual_steering - prev_steering

                if abs(actual_steering - 90) > 35:
                    print(f"[Warning] Extreme angle {actual_steering:.1f}, dampening")
                    actual_steering = prev_steering + (0.5 * delta)

                predicted_steering = actual_steering + (pred_alpha * delta)
                predicted_steering = max(50, min(130, predicted_steering))
                prev_steering = actual_steering
            else:
                predicted_steering = prev_steering

            print(f"Steering: {actual_steering} | Predicted: {predicted_steering:.1f}", end="\r")
            steering_pwm = motor_ctrl.update_steering(90, predicted_steering)
            motor_ctrl.set_steering_pid(steering_pwm)

            # Update the speed based on calcualted steering angle
            if actual_steering is not None and (actual_steering < 75 or actual_steering > 105):
                target_rpm = 30
            else:
                target_rpm = 40
            rpm = motor_enc.calculate_rpm()
            speed_pwm = motor_ctrl.update_speed(target_rpm, rpm)
            motor_ctrl.set_speed(speed_pwm)
    except KeyboardInterrupt:
        pass

    # Clear the pipeline and GPIOs
    motor_ctrl.stop()
    poly_lane_detect.clean_up()

    # Plot PID and data for debugging
    motor_ctrl.plot_speed_PID()
    motor_ctrl.plot_steering_PID()
    motor_ctrl.plot_combined_run()
    print("\nRobot terminated")

encoder.dts

C/C++
/dts-v1/;
/plugin/;

/ {
    compatible = "brcm,bcm2835";
    fragment@0 {
        target-path = "/";
        __overlay__ {
            encoder_drv {
                compatible = "speed_encoder";
                enc-gpios = <&gpio 23 0>;     /* Pin 23 for Encoder - Active High */
            };
        };
    };
};

poly_lane2.py

Python
import math
import cv2
import numpy as np
from camera import ThreadedCamera

class LaneTracker:
    def __init__(self):
        self.cam = ThreadedCamera(0)
        self.smoothed_error = 0
        self.alpha = 0.3
        self.camera_offset_px = 5
        
        print("Warming up the dumb camera")
        for i in range(10):
          self.cam.get_frame()

        print("--- Lane Detection Pipeline is running ---")

    def get_lane_data(self):
        frame = self.cam.get_frame()  
        if frame is None:
            return None, None, None

        # Setup Frame and HSV
        h, w = frame.shape[:2]
        mid_x = (w/2)
        hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)

        # Blue Masking to detect the lines
        lower_blue = np.array([90, 50, 50])
        upper_blue = np.array([130, 255, 255])
        mask = cv2.inRange(hsv, lower_blue, upper_blue)
        
        # ROI (Region of Interest) to focus on he lines only for lane detection
        roi_mask = np.zeros_like(mask)
        cv2.rectangle(roi_mask, (0, int(h * 0.6)), (w, h), 255, -1)
        masked_blue = cv2.bitwise_and(mask, roi_mask)

        # Initialize Overlay for window
        overlay = np.zeros_like(frame)
        center_fit = None

        # Extract and Cluster Pixels
        pixel_y, pixel_x = np.where(masked_blue == 255)
        if len(pixel_x) > 50:
            sorted_idx = np.argsort(pixel_x)
            sx, sy = pixel_x[sorted_idx], pixel_y[sorted_idx]

            # Find gap between two lanes
            gaps = np.where(np.diff(sx) > 40)[0]
            plot_y = np.linspace(int(h * 0.4), h - 1, 20)

            try:
                if len(gaps) > 0:
                  # Dual Lane Detection
                  split = gaps[0] + 1
                  lx, ly = sx[:split], sy[:split]
                  rx, ry = sx[split:], sy[split:]
              
                  if len(ly) > 10 and len(ry) > 10:
                      l_fit = np.polyfit(ly, lx, 2)
                      r_fit = np.polyfit(ry, rx, 2)
              
                      # --- CENTERING CONSTRAINT ---
                      # At each y level, measure distance from center to each lane
                      # If unequal, shift the center fit until they match
                      check_y = np.linspace(int(h * 0.6), h - 1, 10)  # sample points in ROI
              
                      l_pts = np.polyval(l_fit, check_y)   # left lane x positions
                      r_pts = np.polyval(r_fit, check_y)   # right lane x positions
              
                      dist_to_left  = np.mean(np.polyval((l_fit + r_fit) / 2, check_y) - l_pts)
                      dist_to_right = np.mean(r_pts - np.polyval((l_fit + r_fit) / 2, check_y))
              
                      imbalance = dist_to_left - dist_to_right   # positive = too close to right
                      correction = imbalance / 2                  # shift center by half the gap
              
                      center_fit = (l_fit + r_fit) / 2
                      center_fit[2] += correction                 # shift the constant term only
              
#                      print(f"[Center] L:{dist_to_left:.1f} R:{dist_to_right:.1f} Imbalance:{imbalance:.1f} Correction:{correction:.1f}px")
              
                      # Visualize
                      l_path = np.array([np.transpose(np.vstack([np.polyval(l_fit, plot_y), plot_y]))], np.int32)
                      r_path = np.array([np.transpose(np.vstack([np.polyval(r_fit, plot_y), plot_y]))], np.int32)
                      cv2.polylines(overlay, l_path, False, (255, 0, 0), 2)
                      cv2.polylines(overlay, r_path, False, (255, 0, 0), 2)
            except Exception:
                pass

                
        if center_fit is not None:
          center_at_bottom = np.polyval(center_fit, h-1)
                    
        # Draw the Center Trajectory for car heading (Green Curve)
        if center_fit is not None:
            c_pts = np.polyval(center_fit, plot_y)
            c_path = np.array([np.transpose(np.vstack([c_pts, plot_y]))], np.int32)
            cv2.polylines(overlay, c_path, False, (0, 255, 0), 5)

        # Red Detection for stop
        r_mask = cv2.bitwise_or(
            cv2.inRange(hsv, np.array([0, 70, 50]),   np.array([10, 255, 255])),
            cv2.inRange(hsv, np.array([155, 70, 50]), np.array([180, 255, 255]))
        )
        overlay[r_mask > 0] = (0, 0, 255)

        # Combined frame with overlay
        final_view = cv2.addWeighted(frame, 0.7, overlay, 1.0, 0)

        # Only update smoothed_error when we actually see a lane
        angle = self.calculate_steering_angle(center_fit, frame.shape)
        if angle is not None:
            self.smoothed_error = (self.alpha * (angle - 90) + (1 - self.alpha) * self.smoothed_error)

        steering_angle = 90 + self.smoothed_error
        return steering_angle, np.sum(r_mask) / 255, final_view

    def calculate_steering_angle(self, center_fit, frame_shape):
        """
        Calculates the steering angle from the polynomial center path.
        90 degrees = STRAIGHT. <90 = LEFT. >90 = RIGHT.
        """
        h, w = frame_shape[:2]
        mid_x = w / 2

        if center_fit is None:
            return None   

        target_y = int(h * 0.6)
        target_x = np.polyval(center_fit, target_y)

        dx = target_x - mid_x
        dy = h - target_y

        angle_rad = math.atan2(dx, dy)
        angle_deg = math.degrees(angle_rad)

        # Used for Steering PID error
        steering_angle = int(angle_deg + 90)
        return max(50, min(130, steering_angle))

    def clean_up(self):
        self.cam.release()
        cv2.destroyAllWindows()
        
        
# if __name__ == "__main__":
#     tracker = LaneTracker()
#     try:
#         while True:
#             error, red, img = tracker.get_lane_data()
#             if img is not None:
#                 print(f"Error: {error:5.1f} | Red Area: {red:6.0f}", end="\r")
#                 cv2.imshow("Perfected View", img)
#                 if cv2.waitKey(1) & 0xFF == ord('q'):
#                     break
#     finally:
#         tracker.clean_up()

motor.py

Python
import matplotlib
# Use the 'Agg' backend to avoid memory-heavy GUI windows
matplotlib.use('Agg') 
import numpy as np
import matplotlib.pyplot as plt
import RPi.GPIO as GPIO
import time
from time import sleep

# Minimum DC PWM
min_speed_pwm = 7.5
max_speed_pwm = 8.2
default_speed_pwm = 7.85
steering_forward = 7.5 
steering_right = 9.0
steering_left = 6.0 

steering_trim = 2

# PID Controller
speed_Kp = 0.015
speed_Ki = 0.0072
speed_Kd = 0.0017
#speed_Kp = 0.015
#speed_Ki = 0.0072
#speed_Kd = 0.0017

steering_Kp = 0.1
steering_Ki = 0.00015
steering_Kd = 0.0065
#steering_Kp = 0.1
#steering_Ki = 0.0
#steering_Kd = 0.00659

# Plot for Tuning PID
speed_plot = {
    "rpm_target"  : [],
    "rpm_err"     : [],
    "rpm_actual"  : [],
    "p"           : [], 
    "i"           : [],
    "d"           : [],
    "pwm"         : [],
}
steering_plot = {
    "steering_target"  : [],
    "steering_err"     : [],
    "steering_actual"  : [],
    "p"           : [], 
    "i"           : [],
    "d"           : [],
    "pwm"         : [],
}

speed_pin = 18
steering_pin = 19
"""
    Control Motor
"""
class MotorController:
    def __init__(self):
        self.last_steering_pwm = 7.5

        # PID Controller Init
        self.pid_speed_integral = 0
        self.pid_speed_last_error = 0
        self.pid_speed_last_time = time.time()
        self.pid_steering_integral = 0
        self.pid_steering_last_error = 0
        self.pid_steering_last_time = time.time()
        
        # Pin Initialization
        GPIO.setmode(GPIO.BCM)
        GPIO.setup(speed_pin, GPIO.OUT)
        GPIO.setup(steering_pin, GPIO.OUT)
        self.speed_pwm = GPIO.PWM(speed_pin, 50) # set the frequency to 50 Hz 
        self.steering_pwm = GPIO.PWM(steering_pin, 50) # set the frequency to 50 Hz 
        
        # Initialization
        self.speed_pwm.start(7.5) # Don't change the 7.5 DC because it is for initialization
        self.steering_pwm.start(steering_forward) # Start with the specified duty cycle
        sleep(2)
        
#         Speed init if needed
#        self.speed_pwm.start(10.0) # Don't change the 7.5 DC because it is for initialization
#        sleep(2)
#        self.speed_pwm.start(5.0) # Don't change the 7.5 DC because it is for initialization
#        sleep(2)
#        self.speed_pwm.start(7.5) # Don't change the 7.5 DC because it is for initialization
        
    def set_speed(self, duty_cycle):
        self.speed_pwm.ChangeDutyCycle(duty_cycle)

    def set_steering(self, angle="forward"):
        if angle == "forward":
            self.steering_pwm.ChangeDutyCycle(steering_forward)
        elif angle == "right":
            self.steering_pwm.ChangeDutyCycle(steering_right)
        elif angle == "left":
            self.steering_pwm.ChangeDutyCycle(steering_left)

    def set_steering_pid(self, pwm):
        self.steering_pwm.ChangeDutyCycle(pwm)

    def stop(self):
        self.set_speed(7.5)
        self.set_steering_pid(7.5)

    def update_speed(self, target_RPM, measured_RPM):
        current_time = time.time()
        dt = current_time - self.pid_speed_last_time

        # Prevent zero DC
        if dt < 0.010:
            return 7.5
        
        # Calculate the difference between target and measured
        error = target_RPM - measured_RPM
        
        # PID Calculation
        _P = speed_Kp * error
        self.pid_speed_integral += error * dt   # Accumulate error overtime for calculating Integral part
        _I = speed_Ki * self.pid_speed_integral
        _D =  speed_Kd * ((error - self.pid_speed_last_error) / dt)
        
        # New adjustment
        output = min_speed_pwm + (_P + _I + _D)
        final_pwm = max(min_speed_pwm, min(max_speed_pwm, output))  # ADC Mapping for final Duty Cycle PWM

#        print(f"[Speed - RPM] Measured: {measured_RPM:6.1f} | Error: {error:6.1f} --- PID({_P:1.3f}, {_I:1.3f}, {_D:1.3f}) -> PWM: {final_pwm:5.3f}%")

        # For tuning the PID
        speed_plot["rpm_target"].append(target_RPM)
        speed_plot["rpm_actual"].append(measured_RPM)
        speed_plot["rpm_err"].append(error) 
        speed_plot["p"].append(_P) 
        speed_plot["i"].append(_I)
        speed_plot["d"].append(_D)
        speed_plot["pwm"].append(final_pwm)

        # Update state
        self.pid_speed_last_time = current_time
        self.pid_speed_last_error = error
        return final_pwm

    def update_steering(self, target_angle, actual_angle):
        if actual_angle is None:
            return self.last_steering_pwm  # Neutral/Straight

        now = time.time()
        dt = now - self.pid_steering_last_time
        
        if dt < 0.01: 
          return self.last_steering_pwm

        # Calculate Error
        error = (actual_angle - target_angle)
        
        if(abs(error) < 2):
          error = 0
        
        # PID Calculation
        _P = steering_Kp * error
        self.pid_steering_integral += error * dt
        self.pid_steering_integral = max(-20, min(20, self.pid_steering_integral))  # anti-windup clamp
        _I = steering_Ki * self.pid_steering_integral
        _D = steering_Kd * (error - self.pid_steering_last_error) / dt
        raw_correction = _P + _I + _D

        # Apply Sensitivity and Neutral Offset
        final_pwm = 7.5 + (raw_correction * 2.6)
        
        # Final Hardware Clamp
        final_pwm = max(steering_left, min(steering_right, final_pwm))

        # Save data for your plots
        steering_plot["steering_target"].append(target_angle)
        steering_plot["steering_actual"].append(actual_angle)
        steering_plot["steering_err"].append(error) 
        steering_plot["p"].append(_P)
        steering_plot["i"].append(_I) # We are using PD now, so I is 0
        steering_plot["d"].append(_D)
        steering_plot["pwm"].append(final_pwm)

#        print(f"[Steering - Degree] Measured: {actual_angle:6.1f} | Error: {error:6.1f} --- PID({_P:1.3f}, {_I:1.3f}, {_D:1.3f}) -> PWM: {final_pwm:5.3f}%")

        self.pid_steering_last_time = now
        self.pid_steering_last_error = error
        self.last_steering_pwm = final_pwm
        return final_pwm
    
    def save_pid_plot(self, data_dict, filename, title, y_label_top):
        """Helper function to handle all PID plotting logic."""
        # Check for data and synchronized keys
        if not data_dict or len(next(iter(data_dict.values()))) < 2:
            print(f"Not enough data to plot {title}.")
            return

        try:
            # Sync lengths and create snapshots
            min_len = min(len(data_dict[k]) for k in data_dict.keys())
            
            # Map the specific keys for speed/steering dynamically
            p_val = np.array(data_dict["p"][:min_len])
            i_val = np.array(data_dict["i"][:min_len])
            d_val = np.array(data_dict["d"][:min_len])
            
            # Identify which keys to use for the top plot
            k_target = [k for k in data_dict if "target" in k][0]
            k_actual = [k for k in data_dict if "actual" in k][0]
            k_err    = [k for k in data_dict if "err" in k][0]

            fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 8), sharex=True)

            # Top Plot: Performance
            ax1.plot(np.array(data_dict[k_target][:min_len]), label="Target", linestyle="--", alpha=0.8)
            ax1.plot(np.array(data_dict[k_actual][:min_len]), label="Actual", linewidth=2)
            ax1.plot(np.array(data_dict[k_err][:min_len]), label="Error", linestyle=":", color='gray')
            ax1.set_ylabel(y_label_top)
            ax1.set_title(title)
            ax1.legend(loc='upper right')

            # Bottom Plot: PID Terms
            ax2.plot(p_val, label="P (Proportional)")
            ax2.plot(i_val, label="I (Integral)")
            ax2.plot(d_val, label="D (Derivative)")
            ax2.set_ylabel("Correction (PWM)")
            ax2.set_xlabel("Sample Number")
            ax2.legend(loc='upper right')

            plt.subplots_adjust(hspace=0.3) 
            plt.savefig(filename, dpi=100)
            plt.close(fig)
            print(f"Successfully saved: {filename}")

        except Exception as e:
            print(f"Failed to plot {title}: {e}")
            
    def plot_combined_run(self, filename="DutyCycle_SpeedSteering.png"):
        """Plots Error, Steering PWM, and Speed PWM on a single unified plot."""
        # Ensure we have the 'pwm' keys in our dictionaries
        if "pwm" not in steering_plot or "pwm" not in speed_plot:
            print("Error: 'pwm' data not found in plot dictionaries.")
            return

        if not steering_plot["steering_err"] or not speed_plot["pwm"]:
            print("Not enough data for combined plot.")
            return

        try:
            # Sync lengths
            min_len = min(len(steering_plot["steering_err"]), 
                          len(steering_plot["pwm"]), 
                          len(speed_plot["pwm"]))
            
            frames = np.arange(min_len)
            error_data = np.array(steering_plot["steering_err"][:min_len])
            steer_pwm_data = np.array(steering_plot["pwm"][:min_len])
            speed_pwm_data = np.array(speed_plot["pwm"][:min_len])

            fig, ax1 = plt.subplots(figsize=(12, 7))

            # --- Primary Axis: Error (Degrees) ---
            lns1 = ax1.plot(frames, error_data, color='red', label="Steering Error (Deg)", alpha=0.7)
            ax1.axhline(0, color='red', linestyle='--', alpha=0.3)
            ax1.set_xlabel("Frame Number")
            ax1.set_ylabel("Error (Degrees)", color='red')
            ax1.tick_params(axis='y', labelcolor='red')

            # --- Secondary Axis: Duty Cycles (%) ---
            ax2 = ax1.twinx()
            lns2 = ax2.plot(frames, steer_pwm_data, color='blue', label="Steering PWM (%)", linewidth=2)
            lns3 = ax2.plot(frames, speed_pwm_data, color='green', label="Speed PWM (%)", linewidth=2)
            ax2.axhline(7.5, color='blue', linestyle=':', alpha=0.3) # Steering Neutral Reference
            ax2.set_ylabel("Duty Cycle (%)", color='black')
            
            # Add a title and adjust layout
            plt.title("Combined Performance: Error vs Duty Cycles")
            
            # Combine legends from both axes into one box
            lns = lns1 + lns2 + lns3
            labs = [l.get_label() for l in lns]
            ax1.legend(lns, labs, loc='upper right', framealpha=0.9)

            fig.tight_layout()
            plt.savefig(filename, dpi=150)
            plt.close(fig)
            print(f"Successfully saved single-frame combined plot: {filename}")

        except Exception as e:
            print(f"Failed to generate combined plot: {e}")

    # --- Simplified Callers ---
    def plot_speed_PID(self):
        self.save_pid_plot(speed_plot, "PID_speed.png", "Speed PID Response", "RPM")

    def plot_steering_PID(self):
        self.save_pid_plot(steering_plot, "PID_steering.png", "Steering PID Response", "Degrees")


# if __name__ == "__main__":
#     motor_controller = MotorController() # Initialize motor controller with default pins and steering angle
#     try:
#         while True:
#             # Move forward
#             motor_controller.set_steering("forward")
#             motor_controller.set_speed(default_speed_pwm)
#             sleep(1)
        
#             # Stop
#             motor_controller.set_speed(0)
#             sleep(0.5)
        
#             # Turn left
#             motor_controller.set_steering("left")
#             motor_controller.set_speed(default_speed_pwm-0.1) # Set speed back to default
#             sleep(2)
        
#             # Turn right
#             motor_controller.set_steering("right")
#             motor_controller.set_speed(default_speed_pwm-0.1) # Set speed back to default
#             sleep(2)
        
#             # Turn straight
#             motor_controller.set_steering("left")
#             motor_controller.set_speed(default_speed_pwm-0.1) # Set speed back to default
#             sleep(1)
#     except KeyboardInterrupt:
#         # Terminate motor controller and clean up GPIO settings
#         motor_controller.stop()
#         pass

    

encoder.py

Python
import RPi.GPIO as GPIO
import time

"""
    Calculate Motor RPM
"""
class MotorEncoder:
    def __init__(self):
        self.param_path = "/sys/module/EncoderDriver/parameters/enc_count"
        self.last_count = self._read_enc()
        self.last_time = time.time()
        self.holes_per_rev = 20
        self.rpm_history = [0, 0, 0, 0, 0] # For filter

    def _read_enc(self):
        """Read the raw count from the kernel module"""
        try:
            with open(self.param_path, "r") as f:
                return int(f.read().strip())
        except FileNotFoundError:
            print(f"Error: {self.param_path} not found")
            return 0

    def calculate_rpm(self):
        """Calculate the RPM"""
        current_count = self._read_enc()
        current_time = time.time()

        # Calculate deltas based on previous count
        delta_count = current_count - self.last_count
        delta_time = current_time - self.last_time

        if delta_time <= 0:
            return 0.0

        # RPM calculation
        rpm = (delta_count / delta_time) * (60 / self.holes_per_rev)

        # Ignore if there is outlier RPM
        if rpm > 5000:
            return 0.0 # Or return the last known good RPM

        # Apply filtering using averaging
        self.rpm_history.append(rpm)
        self.rpm_history.pop(0)
        smoothed_rpm = sum(self.rpm_history) / len(self.rpm_history)

        # Update time and count
        self.last_count = current_count
        self.last_time = current_time
        return smoothed_rpm

# if __name__ == "__main__":
#     reader = MotorEncoder()
#     try:
#         while True:
#             time.sleep(1) # Measure every second
#             print(f"Current Speed: {reader.calculate_rpm():.2f} RPM")
#     except KeyboardInterrupt:
#         print("\nStopped.")

camera.py

Python
import cv2
import threading

class ThreadedCamera:
    def __init__(self, src=0):
        self.cam = cv2.VideoCapture(src)
        self.cam.set(cv2.CAP_PROP_FRAME_WIDTH, 120)
        self.cam.set(cv2.CAP_PROP_FRAME_HEIGHT, 80)
        self.cam.set(cv2.CAP_PROP_BUFFERSIZE, 1)

        self.frame = None
        self.lock = threading.Lock()
        self.running = True

        # Start background capture thread
        self.thread = threading.Thread(target=self._capture_loop, daemon=True)
        self.thread.start()

    def _capture_loop(self):
        while self.running:
            ret, frame = self.cam.read()
            if ret:
                with self.lock:
                    self.frame = frame   # always overwrite with latest

    def get_frame(self):
        with self.lock:
            return self.frame.copy() if self.frame is not None else None

    def release(self):
        self.running = False
        self.thread.join()
        self.cam.release()

EncoderDriver.c

C/C++
#include <linux/module.h>
#include <linux/init.h>
#include <linux/of_device.h>
#include <linux/kernel.h>
#include <linux/gpio.h>
#include <linux/gpio/consumer.h>
#include <linux/platform_device.h>
#include <linux/interrupt.h>
#include <linux/ktime.h>
#include <linux/mod_devicetable.h>

// Timer variables
ktime_t now;
static ktime_t lastTime;

// GPIO variables
unsigned int irq_enc_num;
struct gpio_desc *enc;
static int enc_count = 0;
module_param(enc_count, int, 0644);

/**
 * @brief Interrupt service routine is called, when interrupt is triggered
 */
static irqreturn_t gpio_irq_handler(int irq, void *dev_id)
{
	s64 time_diff_ns;

	// Verify debounce time
	now = ktime_get();
	time_diff_ns = ktime_to_ns(ktime_sub(now, lastTime));

	// Debounce is 1ms because we have 20-hole wheel
	if(time_diff_ns < 10000000)
	{
		printk("Pressed is ignored\n");
		return IRQ_HANDLED;
	}

	// Count encoder
    enc_count++;
	lastTime = ktime_get();

  return IRQ_HANDLED;
}

// probe function
static int enc_probe(struct platform_device *pdev)
{
  	// Initiate Button
  	struct device *dev = &pdev->dev;
  	enc = devm_gpiod_get_index(dev, "enc", 0, GPIOD_IN);
  
  	// Register Button IRQ
  	irq_enc_num = gpiod_to_irq(enc);
  	if(request_irq(irq_enc_num, gpio_irq_handler, IRQF_TRIGGER_FALLING, "enc_irq", NULL) != 0){
		printk("Error!\nCan not request interrupt nr.: %d\n", irq_enc_num);
		return -1;
	}
	printk("Sucessfully insert Encoder module\n");
	return 0;
}

// remove function
static void enc_remove(struct platform_device *pdev)
{
	// Release IRQ
  	free_irq(irq_enc_num, NULL);
  	printk("Removing encoder\n");
}

static struct of_device_id matchy_match[] = {
    { .compatible = "speed_encoder"},
    {/* leave alone - keep this here (end node) */},
};

// platform driver object
static struct platform_driver enc_drv = {
	.probe	 = enc_probe,
	.remove	 = enc_remove,
	.driver	 = {
	       .name  = "Encoder driverr",
	       .owner = THIS_MODULE,
	       .of_match_table = matchy_match,
	},
};

module_platform_driver(enc_drv);

MODULE_DESCRIPTION("Encoder driver for Final Project - ELEC 553");
MODULE_AUTHOR("Owlt-onomous");
MODULE_LICENSE("GPL v2");
MODULE_ALIAS("platform:encoder_driver");

GitHub Repository

Credits

Valeria Itrat Herrera
1 project • 0 followers
Sugiarto Wibowo
1 project • 0 followers
mehul4373
0 projects • 0 followers
Ashley Garcia
0 projects • 0 followers

Comments