donutsorelse
Published © GPL3+

The Ultimate Sentry

Harmlessly scare away predators that would otherwise eat or harm plants and animals on a property.

ExpertWork in progress5 days220
The Ultimate Sentry

Things used in this project

Hardware components

NavQ
NXP NavQ
NavQPlus
×1
Bosch BME688
×1
Wyze Cam v2
×1

Software apps and online services

Visual Studio Code Extension for Arduino
Microsoft Visual Studio Code Extension for Arduino
Visual Studio Code - Used for Python
jMavSim

Story

Read more

Code

drone_controller

Python
This controls all logic for the drone
import cv2
import numpy as np
import pyeIQ as iq
import RPi.GPIO as GPIO
from pygame import mixer
import board
import analogio
import navq.drone
import navq.camera
import navq.gps
import configparser
import time
import smbus
import requests
import json
import math
import GPS
import rospy
from geometry_msgs.msg import Twist
import bme680
import dronekit
from twilio.rest import Client
import asyncio
from mavsdk import System
from mavsdk.mission import (MissionItem, MissionPlan)

sensor = bme680.BME680(i2c_addr=bme680.I2C_ADDR_PRIMARY)
sensor.set_humidity_oversample(bme680.OS_2X)
sensor.set_pressure_oversample(bme680.OS_4X)
sensor.set_temperature_oversample(bme680.OS_8X)
sensor.set_filter(bme680.FILTER_SIZE_3)
sensor.set_gas_status(bme680.ENABLE_GAS_MEAS)


# Connect to the drone's serial port
vehicle = iq.connect("/dev/ttyACM0", baud=921600, wait_ready=True)


# Set up the Bosch Sensor
GPIO.setmode(GPIO.BCM)
GPIO.setup(17, GPIO.IN)

# Setup GPIO pins for controlling the light
GPIO.setwarnings(False)
GPIO.setup(18, GPIO.OUT)
# Define the Bosch Sensor address
BOSCH_SENSOR_ADDRESS = 0x77

# Define the bus object
bus = smbus.SMBus(1)

# API key for OpenWeatherMap
api_key = 'YOUR_API_KEY_HERE'

# URL for OpenWeatherMap API
url = 'https://api.openweathermap.org/data/2.5/weather'

return_home_flag=False
# Define the coordinates for the safety zone
zone_min_lat, zone_min_long, zone_max_lat, zone_max_long = None, None, None, None

# Define the trained Haar Cascade Classifier
predator_cascade = cv2.CascadeClassifier('predator_cascade.xml')

# Define the path to the configuration file
CONFIG_FILE = 'config.ini'

# Initialize the NavQPlus drone object
drone = navq.drone.Drone()

async def connect_to_drone():
    drone = System()
    await drone.connect(system_address="serial:///dev/ttyACM0:921600")
    return drone

# Initialize the camera module
camera = navq.camera.Camera()

BUTTON_PIN = 22
GPIO.setup(BUTTON_PIN, GPIO.IN, pull_up_down=GPIO.PUD_UP)


# Initialize the Pygame mixer for playing sounds
mixer.init()

# Create an instance of the analog input pin for the light sensor
light_sensor = analogio.AnalogIn(board.LIGHT)
# Define the home location
home_location = None
# Start the video stream
cap = cv2.VideoCapture(0)
bad_weather=False
break_main_loop=False

time_check_short=time.time()
time_check_weather=time.time()
weather_detection_reset=True # track when weather has been deemed good so that we only send text messages for new events
WEATHER_TIME_RESET_LENGTH = 900
RAIN_ALERT_MSG = "Rain detected by BME688 sensor. Heading home."
FIRE_ALERT_MSG = "FIRE DETECTED!!!!!!!"
WEATHER_FORECAST_ISSUE_DETECTED = "Heading home. There is a high chance of " #generic message to precede current weather issue
USER_PHONE_NUMBER="+1" #fill in your phone number
TWILIO_PHONE_NUMBER="+1" #You'll need to setup a Twilio number and add it here
predator_intervals = []
DETECTION_INTERVAL = 5
PREDATOR_ALERT_MSG = "Predator detected!"


# Initialize variables to store previous readings
prev_temperature = None
prev_humidity = None
prev_air_quality = None

# Twilio account credentials
account_sid = 'your_account_sid'
auth_token = 'your_auth_token'
client = Client(account_sid, auth_token)
safe_cascade = cv2.CascadeClassifier('path/to/safe_cascade.xml')
predator_cascade = cv2.CascadeClassifier('path/to/predator_cascade.xml')
uncertain_cascade = cv2.CascadeClassifier('path/to/uncertain_cascade.xml')



def send_update_message(txt):
        client.messages.create(
                to=USER_PHONE_NUMBER,
                from_=TWILIO_PHONE_NUMBER,
                body=txt
            )
        
# Whenever there's bad weather, make sure we return to home and stay there until we think the weather is acceptable again
def update_weather_clock(txt):
    global return_home_flag, bad_weather,time_check_weather
    bad_weather=return_home_flag= True
    # Don't inundate the user with weather messages
    if weather_detection_reset:
        weather_detection_reset=False
        send_update_message(txt)
    time_check_weather=time.time()

def detect_weather():
    """
    Detects weather conditions using the BME688 sensor.
    Returns "Fire" if fire is detected, "Rain" if rain is detected, "Clear" otherwise.
    """
    global last_humidity, last_temperature, last_pressure, last_gas_resistance

    # Read the humidity, temperature, pressure, and gas resistance levels from the BME688 sensor
    if sensor.get_sensor_data():
        humidity = sensor.data.humidity
        temperature = sensor.data.temperature
        pressure = sensor.data.pressure
        gas_resistance = sensor.data.gas_resistance
    # Check if there is a significant change in humidity, temperature, pressure, or gas resistance
    if last_humidity is not None and last_temperature is not None and last_pressure is not None and last_gas_resistance is not None:
        if abs(last_humidity - humidity) > 10 or abs(last_temperature - temperature) > 5 or abs(last_pressure - pressure) > 50 or abs(last_gas_resistance - gas_resistance) > 500:
            update_weather_clock(RAIN_ALERT_MSG)
            
    # Check if there is a significant change in temperature and air quality
    if (abs(temperature-last_temperature) > 5 and abs(last_gas_resistance - gas_resistance) > 500) or temperature > 110 or gas_resistance > 5000:
        #TODO - Send out fire alert
        update_weather_clock(FIRE_ALERT_MSG)

    # Store the current readings as the last readings
    last_humidity, last_temperature, last_pressure, last_gas_resistance = humidity, temperature, pressure, gas_resistance

def get_local_weather(current_lat, current_long):
    # Parameters for API request (coordinates for the drone's current location)
    params = {'lat': current_lat, 'lon': current_long, 'appid': api_key}

    # Send API request and get response
    response = requests.get(url, params=params)
    data = json.loads(response.text)

    # Check weather conditions
    weather = data['weather'][0]['main']
    if weather == 'Thunderstorm' or weather == 'Rain' or weather == 'Snow' or weather == 'Extreme' or weather == 'Windy':
        # Flying conditions are too dangerous, send SMS alert and return to start
        update_weather_clock(WEATHER_FORECAST_ISSUE_DETECTED + weather)
    else:
        detect_weather()

def button_pressed():
    return GPIO.input(BUTTON_PIN) == GPIO.LOW

# Define the function to set the home location
def set_home_location():
    global home_location
    while True:
        # Display the message on the screen
        ret, frame = cap.read()
        cv2.putText(frame, "Press button to set home location", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2)
        cv2.imshow("Drone Object Detection", frame)

        # Check if the button is pressed
        if button_pressed:
            # Get the drone's current GPS coordinates and set them as the home location
            home_location = dronekit.LocationGlobalRelative(vehicle.location.global_relative_frame.lat,
                                                             vehicle.location.global_relative_frame.lon,
                                                             vehicle.location.global_relative_frame.alt)
            print("Home location set to:", home_location)
            break

# Define the dimensions of the safety zone
safety_zone = {'top_left': None, 'top_right': None, 'bottom_left': None, 'bottom_right': None}

# Define the button to set the safety zone
safety_zone_button = navq.gps.Location(-34.364114, 149.166022, 10)

# Define the display to show the safety zone status
safety_zone_display = "Safety zone not set"

def set_safety_zone():
    global safety_zone_display
    # Display instructions for setting the safety zone
    safety_zone_display = "Press button to set top left corner"
    print(safety_zone_display)

    # Wait for the button to be pressed to set the top left corner
    while True:
        button_pressed = navq.gps.location_callback.check_location(safety_zone_button)
        if button_pressed:
            safety_zone['top_left'] = drone.vehicle.location.global_relative_frame
            break

    # Display instructions for setting the safety zone
    safety_zone_display = "Press button to set top right corner"
    print(safety_zone_display)

    # Wait for the button to be pressed to set the top right corner
    while True:
        button_pressed = navq.gps.location_callback.check_location(safety_zone_button)
        if button_pressed:
            safety_zone['bottom_left'] = drone.vehicle.location.global_relative_frame
            break

    # Display instructions for setting the safety zone
    safety_zone_display = "Press button to set bottom right corner"
    print(safety_zone_display)

    # Wait for the button to be pressed to set the bottom right corner
    while True:
        button_pressed = navq.gps.location_callback.check_location(safety_zone_button)
        if button_pressed:
            safety_zone['bottom_right'] = drone.vehicle.location.global_relative_frame
            break

    # Display the safety zone coordinates
    safety_zone_display = f"Safety zone set:\n{str(safety_zone)}"
    print(safety_zone_display)

    # Save the safety zone coordinates to a configuration file
    config = configparser.ConfigParser()
    config['SafetyZone'] = {
        'TopLeftLat': str(safety_zone['top_left'].lat),
        'TopLeftLon': str(safety_zone['top_left'].lon),
        'TopRightLat': str(safety_zone['top_right'].lat),
        'TopRightLon': str(safety_zone['top_right'].lon),
        'BottomLeftLat': str(safety_zone['bottom_left'].lat),
        'BottomLeftLon': str(safety_zone['bottom_left'].lon),
        'BottomRightLat': str(safety_zone['bottom_right'].lat),
        'BottomRightLon': str(safety_zone['bottom_right'].lon)
    }

    with open('config.ini', 'w') as configfile:
        config.write(configfile)

    print("Safety zone coordinates saved to config.ini file.")
# Function to check if the drone is within the safety zone
def is_in_safety_zone(lat, long):
    return zone_min_lat <= lat <= zone_max_lat and zone_min_long <= long <= zone_max_long

# Function to determine the safe direction to move the drone
def determine_safe_direction(obstacle_location, image_width):
    # Determine the direction of the obstacle relative to the drone
    obstacle_direction = obstacle_location - (image_width / 2)

    # Determine the safe direction to move the drone
    return 'right' if obstacle_direction >= 0 else 'left'

import math

def move(direction, distance=1):
    # Get the current location of the drone
    current_location = vehicle.location.global_frame
    current_lat = current_location.lat
    current_long = current_location.lon

    # Calculate the target coordinates based on the direction
    if direction == 'left':
        target_lat = current_lat + distance * math.sin(-1)
        target_long = current_long + distance * math.cos(-1)
    elif direction == 'right':
        target_lat = current_lat + distance * math.sin(1)
        target_long = current_long + distance * math.cos(1)
    elif direction == 'forward':
        target_lat = current_lat + distance * math.cos(1)
        target_long = current_long + distance * math.sin(1)
    elif direction == 'backward':
        target_lat = current_lat - distance * math.cos(1)
        target_long = current_long - distance * math.sin(1)
    else:
        print("Invalid direction provided. Please use 'left', 'right', 'forward', or 'backward'.")
        return

    # Command the drone to move to the target coordinates
    fly_to_location(target_lat, target_long)


# Function to turn on the light
def turn_on_light():
    GPIO.output(18, GPIO.HIGH)

# Function to turn off the light
def turn_off_light():
    GPIO.output(18, GPIO.LOW)

# Function to play a sound
def play_sound(sound_file):
    mixer.music.load(sound_file)
    mixer.music.play()

# Function to check if it's dark outside
def is_dark(threshold):
    # Get the current light level from the light sensor
    light_level = light_sensor.value

    # Convert the light level to a value between 0 and 1
    normalized_light_level = light_level / 65535

    # Determine if it's dark based on the threshold value
    return normalized_light_level < threshold


# Function to load the safety zone coordinates from the configuration file
def load_safety_zone():
    config = configparser.ConfigParser()
    config.read(CONFIG_FILE)
    if 'SafetyZone' in config:
        global safety_zone
        safety_zone = {
            'top_left': (float(config['SafetyZone']['FirstCornerLat']), float(config['SafetyZone']['FirstCornerLon'])),
            'top_right': (float(config['SafetyZone']['SecondCornerLat']), float(config['SafetyZone']['SecondCornerLon'])),
            'bottom_left': (float(config['SafetyZone']['ThirdCornerLat']), float(config['SafetyZone']['ThirdCornerLon'])),
            'bottom_right': (float(config['SafetyZone']['FourthCornerLat']), float(config['SafetyZone']['FourthCornerLon']))
        }
        print("Loaded safety zone coordinates from config.ini file.")
        # return safety_zone
    else:
        print("Safety zone coordinates not found in config.ini file.")
        # return None
        # Function to start the walk-through
        set_safety_zone()

load_safety_zone



def arm_and_takeoff(altitude):
    """
    Arms the drone and takes off to a specified altitude.
    """
    print("Arming drone...")
    vehicle.mode = iq.VehicleMode("GUIDED")
    vehicle.armed = True
    while not vehicle.armed:
        print("Waiting for arming...")
        time.sleep(1)
        
    print("Taking off...")
    vehicle.simple_takeoff(altitude)
    while True:
        altitude = vehicle.location.global_relative_frame.alt
        if altitude >= altitude * 0.95:
            print("Altitude reached")
            break
        print("Current altitude: ", altitude)
        time.sleep(1)

def return_to_base():
    """
    Sends the drone back to the starting position.
    """
    print("Returning to base...")
    vehicle.mode = iq.VehicleMode("RTL")

async def take_off(drone, altitude):
    print("Arming...")
    await drone.action.arm()

    print("Taking off...")
    await drone.action.takeoff()
    await asyncio.sleep(5)

    await drone.action.set_maximum_speed(5.0)

    print(f"Ascending to {altitude} meters...")
    await drone.action.goto_location(drone.location.current().latitude, drone.location.current().longitude, altitude, 0)

    # Wait for the drone to reach target altitude
    while True:
        altitude = await drone.telemetry.position()
        if altitude.relative_altitude_m >= altitude * 0.95:
            print("Reached target altitude.")
            break
        print("Current altitude: ", altitude.relative_altitude_m)
        await asyncio.sleep(1)
        
        
def obstacle_avoidance(drone, sonar, lidar, stereo_cams):
    # Read values from sonar sensor
    sonar_distance = sonar.read()
    
    # Read values from LIDAR sensor
    lidar_distance = lidar.read()
    
    # Read values from stereo cameras
    stereo_distance = stereo_cams.read_distance()
    
    # Calculate a weighted average of the distances, giving more importance to the LIDAR and stereo cameras
    avg_distance = (sonar_distance + 2 * lidar_distance + 2 * stereo_distance) / 5
    
    # Check if obstacle is present
    if avg_distance < drone.obstacle_threshold:
        # Fly around obstacle
        drone.fly_around()
        
        # If unable to fly around, stop and hover
        if not drone.flying_around:
            drone.stop_and_hover()
            
            # If unable to hover, emergency land
            if not drone.hovering:
                drone.emergency_land()
                
    # No obstacle present, continue with normal flight
    else:
        drone.continue_flight()
        
    # Sleep for a specified amount of time before checking again
    time.sleep(0.5)


def safe_land():
    # Initialize the ROS node
    rospy.init_node('safe_land', anonymous=True)

    # Create a publisher for the Twist message
    pub = rospy.Publisher('/cmd_vel', Twist, queue_size=10)

    # Set the rate at which the drone will land
    rate = rospy.Rate(10)

    # Create a Twist message to set the linear and angular velocity
    velocity = Twist()
    velocity.linear.x = 0
    velocity.linear.y = 0
    velocity.linear.z = -0.5
    velocity.angular.x = 0
    velocity.angular.y = 0
    velocity.angular.z = 0

    # Continue to publish the Twist message until the drone has landed
    while not rospy.is_shutdown():
        pub.publish(velocity)
        rate.sleep()

    # Once the drone has landed, stop the node
    rospy.signal_shutdown("Safe landing complete")

async def fly_to_location(drone, latitude, longitude):
    """
    Commands the drone to fly to a specified GPS location.
    """
    # Start MAVSDK instance
    await drone.connect()

    # Create mission
    mission_item = MissionItem(
        latitude,
        longitude,
        vehicle.location.global_relative_frame.alt,
        0,  # No rotation
        False,  # Do not auto-continue
        float('nan'),  # No gimbal pitch
        float('nan'),  # No gimbal yaw
        MissionItem.CameraAction.NONE,
        0,  # No hover time
        float('nan')  # No distance
    )

    # Upload mission
    mission_plan = MissionPlan([mission_item])
    await drone.mission.upload_mission(mission_plan)

    # Arm and take off
    print("Arming and taking off...")
    await drone.action.arm()
    await drone.action.takeoff()

    # Start mission
    print("Starting mission...")
    await drone.mission.start_mission()

    # Monitor mission progress
    async for mission_progress in drone.mission.mission_progress():
        print(f"Mission progress: {mission_progress.current_item_index}/{mission_progress.mission_count}")
        if mission_progress.current_item_index == mission_progress.mission_count:
            print("Reached target location")
            break

        # Check if it's dark outside and turn on/off the light accordingly
        if is_dark(0.5):
            turn_on_light()
        else:
            turn_off_light()
        main_loop() # This allows us to continue considering all aspects of the flight (predators, obstacles, weather, etc)


async def returnToStart(start_latitude, start_longitude):
    print("Drone is outside safe distance of starting location. Returning to starting location.")
    # Fly back to the starting location
    await fly_to_location(drone, start_latitude, start_longitude)


def determineSafeDirection(obstacle_location, drone_location):
    # Calculate the vector from the drone to the obstacle
    vector_to_obstacle = obstacle_location - drone_location

    # Normalize the vector
    normalized_vector = vector_to_obstacle / np.linalg.norm(vector_to_obstacle)

    # Find a vector perpendicular to the vector to the obstacle
    safe_direction = np.array([-normalized_vector[1], normalized_vector[0]])

    return safe_direction


def follow_predator(vehicle, predator):
    """
    Sends the drone to follow the predator.
    """
    # Calculate the centroid of the predator
    x, y, w, h = predator
    predator_x = x + w/2
    predator_y = y + h/2
    # Get the current location of the drone
    current_location = vehicle.location.global_frame

    # Set the target altitude to the current altitude
    target_altitude = current_location.alt

    # Calculate the target location based on the position of the predator
    target_location = iq.LocationGlobalRelative(current_location.lat, current_location.lon, target_altitude)
    target_location.lat += predator_y * 0.000001
    target_location.lon += predator_x * 0.000001

    # Fly to the target location
    vehicle.simple_goto(target_location)

    # Wait for the drone to reach the target location
    while vehicle.mode.name == "GUIDED":
        current_location = vehicle.location.global_frame
        if current_location.distance_to(target_location) < 1:
            break
        
    # Return the current location of the drone after reaching the target
    return current_location



# Function to send photos
def send_photo(frame, msg):
    # Convert the image to PNG format
    retval, buffer = cv2.imencode('.png', frame)
    image = buffer.tobytes()

    # Send the photo via Twilio
    message = client.messages \
                .create(
                     body=msg,
                     media_url=[image],
                     to="your_phone_number", 
                     from_="your_twilio_number"
                 )


def detect_objects(image):
    img_gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)

    # Detect and process safe objects
    safe_objects = safe_cascade.detectMultiScale(img_gray, 1.1, 4)
    img_gray = process_safe_objects(img_gray, safe_objects)

    # Detect and process predators and uncertain objects
    predators = predator_cascade.detectMultiScale(img_gray, 1.1, 4)
    uncertain_objects = uncertain_cascade.detectMultiScale(img_gray, 1.1, 4)
    process_predators_and_uncertains(image, predators, uncertain_objects)

def process_safe_objects(img_gray, safe_objects):
    for (x, y, w, h) in safe_objects:
        cv2.rectangle(img_gray, (x, y), (x+w, y+h), (0, 0, 0), -1)  # Fill the rectangle with black color
    return img_gray

# Different version of chase_predator
def move_toward_predator(x, w, frame_width):
    """
    Move the drone towards the predator based on its position in the frame.
    """
    predator_center = x + (w / 2)
    frame_center = frame_width / 2

    if predator_center < frame_center - frame_center * 0.1:
        move('left')
    elif predator_center > frame_center + frame_center * 0.1:
        move('right')
    else:
        move('forward')

def process_predators_and_uncertains(image, predators, uncertain_objects):
    current_time = time.time()

    for i, (x, y, w, h) in enumerate(predators):
        cv2.rectangle(image, (x, y), (x+w, y+h), (0, 0, 255), 2)
        move_toward_predator(x, y, w, h, image.shape[1])
        if i not in predator_intervals or current_time - predator_intervals[i] > DETECTION_INTERVAL:
            send_update_message(PREDATOR_ALERT_MSG)
            img_path = f"predator_detected_{i}_{time.time()}.jpg"
            cv2.imwrite(img_path, image)
            send_photo(img_path, PREDATOR_ALERT_MSG)
            predator_intervals[i] = current_time


def main_loop():
    # Capture a frame from the video stream
    ret, frame = cap.read()

    global bad_weather, weather_detection_reset

    # Detect objects
    detect_objects(frame)

    # Get the GPS coordinates of the drone
    current_lat, current_long = GPS.getCoordinates()
    # Get the current time
    current_time = time.time()

    # Check if one minute has passed since last checked
    if current_time - time_check_short >= 60:
        # Update the last checked time
        time_check_short = current_time
        # Run the semi-frequent functions:
        get_local_weather(current_lat,current_long)


    # We reset the clock every time we know there's bad weather - if we make it to this point, we can assume good weather
    if current_time - time_check_weather >= WEATHER_TIME_RESET_LENGTH:
        # Update the last checked time
        time_check_weather = current_time
        bad_weather=False
        weather_detection_reset=True

    # Check if the drone is within the safety zone
    if not is_in_safety_zone(current_lat, current_long):
        # Stop the drone's flight and return to the starting location
        returnToStart()

    # Display the current frame
    cv2.imshow("Drone View", frame)
    # Don't waste battery using the light - we can provide other lights without using the drone's.  The drone will use its light in flight.
    if not is_dark(0.5):
        turn_off_light()
    # Wait for the user to press 'q' to exit the program
    if cv2.waitKey(1) & 0xFF == ord('q'):
        global break_main_loop
        break_main_loop=True


# Main Loop

async def main():
    # Connect to the drone
    global drone
    drone = await connect_to_drone()

    # Set home location
    set_home_location()
    
    while True:
        if not break_main_loop:
            main_loop()
        else:
            break

# Release the video capture device and close all windows
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
cap.release()
cv2.destroyAllWindows()

opencv_trainer

Python
This is designed to automatically train opencv models
import os
import requests
import cv2
import subprocess

def download_images(query, num_images, output_dir, api_key):
    url = 'https://api.pexels.com/v1/search'
    params = {'query': query, 'per_page': num_images}
    headers = {'Authorization': api_key}
    
    response = requests.get(url, params=params, headers=headers)
    if response.status_code == 200:
        json_data = response.json()
        for i, photo in enumerate(json_data['photos']):
            img_url = photo['src']['large']
            img_filename = f'{query}_{i}.jpg'
            img_path = os.path.join(output_dir, img_filename)
            img_data = requests.get(img_url).content
            with open(img_path, 'wb') as handler:
                handler.write(img_data)
    else:
        print(f'Request failed with status code {response.status_code}')
        print(response.text)

def create_samples_file(samples_dir, output_file):
    with open(output_file, 'w') as handler:
        for filename in os.listdir(samples_dir):
            if filename.endswith('.jpg'):
                img_path = os.path.join(samples_dir, filename)
                img = cv2.imread(img_path)
                img_h, img_w = img.shape[:2]
                line = f'{filename} 1 0 0 {img_w} {img_h}\n'
                handler.write(line)

# Set up the Pexels API key
PEXELS_API_KEY = '(your api key)' #Pexels has been underwhelming - I'd recommend using a different option, but I haven't found an ideal one thus far

# Set up the list of objects to train on
objects = ['snake', 'big_cat', 'coyote', 'fox', 'raccoon', 'skunk', 'hawk', 'owl', 'deer', 'rabbit', 'squirrel', 'human', 'dog', 'chicken', 'duck', 'cat']

# Set up the directories to save the images
pos_dir = 'positive'
neg_dir = 'negative'
if not os.path.exists(pos_dir):
    os.makedirs(pos_dir)
if not os.path.exists(neg_dir):
    os.makedirs(neg_dir)

# Set up the number of images to download
num_images = 100

for obj in objects:
    print(f'Training Haar Cascade Classifier for {obj}...')

    # Set up the search query
    search_query = obj.replace('_', ' ') + ' without background'

    # Set up the positive sample directory
    pos_sample_dir = os.path.join(pos_dir, obj)
    if not os.path.exists(pos_sample_dir):
        os.makedirs(pos_sample_dir)

    # Download the positive images
    print(f'Downloading {num_images} positive images for {obj}...')
    download_images(search_query, num_images, pos_sample_dir, PEXELS_API_KEY)

    # Create the positive sample file
    pos_file = f'{obj}_pos.txt'
    create_samples_file(pos_sample_dir, pos_file)

    # Download the negative images if not downloaded
    if not os.listdir(neg_dir):
        print('Downloading negative images...')
        download_images('negative_samples', num_images, neg_dir, PEXELS_API_KEY)

    # Create the negative sample file
    neg_file = 'neg.txt'
    create_samples_file(neg_dir, neg_file)

    # Train the Haar Cascade Classifier
    num_stages = 20
    min_hit_rate = 0.999
    max_false_alarm_rate = 0.5
    num_pos = len(os.listdir(pos_sample_dir))
    num_neg = len(os.listdir(neg_dir))
    img_size = f'{64}x{64}'

    # Call the train_cascade.bat script to train the classifiers
    train_cascade_command = f'train_cascade.bat {obj}_pos.txt neg.txt {obj}_cascade.xml {num_stages} {img_size} {min_hit_rate} {max_false_alarm_rate} {num_pos} {num_neg}'
    subprocess.run(train_cascade_command, shell=True)

    print(f'Haar Cascade Classifier training complete for {obj}.')

wyze_cam_controller

Python
This controls a security camera that helps the drone monitor for predators
import cv2
import numpy as np
import time
from dronekit import connect, VehicleMode, LocationGlobalRelative

# Function to arm the drone and take off to the specified altitude
def arm_and_takeoff(altitude):
    while not vehicle.is_armable:
        print("Waiting for vehicle to become armable.")
        time.sleep(1)

    print("Arming motors")
    vehicle.mode = VehicleMode("GUIDED")
    vehicle.armed = True

    while not vehicle.armed:
        print("Waiting for arming...")
        time.sleep(1)

    print("Taking off!")
    vehicle.simple_takeoff(altitude)

    while True:
        print("Altitude: ", vehicle.location.global_relative_frame.alt)
        if vehicle.location.global_relative_frame.alt >= altitude * 0.95:
            print("Reached target altitude")
            break
        time.sleep(1)

# Initialize the drone
vehicle = connect('COM3', wait_ready=True)

# Load the pre-trained Haar Cascade classifier
classifier = cv2.CascadeClassifier('predator_cascade.xml')

# Initialize the camera
cap = cv2.VideoCapture('rtsp://192.168.1.108:554/onvif1')

# Define the minimum area for a detected object
min_area = 1000

# Variable to keep track of whether the drone is in the air
in_air = False
camera_lat = 1  #the wyze cam cannot get the coordinates itself - you'll need to enter them in here
camera_lng = 1  #for both the lat and the lng
camera_altitude = 10 #(and potentially tweak the latitude as well if needed)
    
try:
    # Loop over frames from the camera
    while True:
        # Read the next frame from the camera
        ret, frame = cap.read()

        # Resize the frame for faster processing
        frame_resized = cv2.resize(frame, None, fx=0.5, fy=0.5)

        # Convert the resized frame to grayscale
        gray = cv2.cvtColor(frame_resized, cv2.COLOR_BGR2GRAY)

        # Detect objects in the grayscale frame
        objects = classifier.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30))

        # Loop over the detected objects
        for (x, y, w, h) in objects:
            # Check if the object is large enough and the drone is not already in the air
            if w * h > min_area and not in_air:
                # Arm the drone and take off
                arm_and_takeoff(10)
                in_air = True

                # Send a command to the drone to fly to a safe location
                vehicle.simple_goto(LocationGlobalRelative(camera_lat, camera_lng, camera_altitude))

        # Display the frame with the detected objects
        cv2.imshow('frame', frame_resized)

        # Wait for a key press to exit
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
except KeyboardInterrupt:
    print("Interrupted")
finally:
    # Clean up and release resources
    vehicle.close()
    cap.release()
    cv2.destroyAllWindows()

Credits

donutsorelse

donutsorelse

13 projects • 11 followers
I make different stuff every week of all kinds. Usually I make funny yet useful inventions.

Comments