Hansblix OwuorNickson Kiprotich
Published © GPL3+

Virtual Fencing for Mitigating Human Wildlife Conflict

An intelligent, IoT-driven virtual fence that leverages real-time ML object detection and radar sensing to proactively protect communities

IntermediateShowcase (no instructions)6 hours5
Virtual Fencing for Mitigating Human Wildlife Conflict

Things used in this project

Hardware components

Raspberry Pi 5
Raspberry Pi 5
×1
Ai Thinker RD-03D Radar module
×1
NEO-6M gps module
×1
GSM Module SIM800L with MIC & Headphone Jack
M5Stack GSM Module SIM800L with MIC & Headphone Jack
×1

Hand tools and fabrication machines

Soldering iron (generic)
Soldering iron (generic)

Story

Read more

Code

security_master_yolo.py

Python
#!/usr/bin/env python3

import serial
import time
import math
import pynmea2
import cv2
import threading
import glob
from ultralytics import YOLO
from datetime import datetime

# -------------------------------------------------
# CONFIGURATION
# -------------------------------------------------
RADAR_PORT = "/dev/ttyAMA0"
RADAR_BAUDRATE = 256000

GPS_port="/dev/ttyUSB1"
GPS_BAUDRATE = 9600

GSM_PORT = "/dev/ttyUSB0"
GSM_BAUDRATE = 9600

PHONE = "+254745524878"
SMS_COOLDOWN = 15
DETECTION_RANGE_MM = 8000

YOLO_MODEL_PATH = "best.pt"
FRAME_SKIP = 2

# -------------------------------------------------
# AUTO GPS PORT DETECTION
# -------------------------------------------------
def find_gps_port():
    ports = glob.glob("/dev/ttyUSB*")
    return ports[0] if ports else None

GPS_PORT = find_gps_port()
if not GPS_PORT:
    print(" No GPS USB device found!")
else:
    print(f" GPS detected on {GPS_PORT}")

# -------------------------------------------------
# GPS LOCKING THREAD
# -------------------------------------------------
class GPSReader(threading.Thread):
    def __init__(self):
        super().__init__(daemon=True)

        self.latitude = None
        self.longitude = None
        self.fix_acquired = False

        self.lock = threading.Lock()

        try:
            self.ser = serial.Serial(GPS_PORT, GPS_BAUDRATE, timeout=1)
            print(f" GPS connected ({GPS_PORT})")
        except Exception as e:
            print(f" GPS error: {e}")
            self.ser = None

        self.start()

    def run(self):
        while True:
            if not self.ser:
                time.sleep(1)
                continue

            try:
                line = self.ser.readline().decode("ascii", errors="ignore").strip()
                if not line.startswith(("$GPGGA", "$GPRMC")):
                    continue

                msg = pynmea2.parse(line)

                if msg.latitude and msg.longitude:
                    with self.lock:
                        self.latitude = msg.latitude
                        self.longitude = msg.longitude
                        self.fix_acquired = True

            except Exception:
                pass

            time.sleep(0.1)

    def get_coords(self):
        with self.lock:
            if self.fix_acquired:
                return f"{self.latitude:.6f}, {self.longitude:.6f}"
            return "GPS NO FIX"


# -------------------------------------------------
# RD03D RADAR
# -------------------------------------------------
class Target:
    def __init__(self, x, y):
        self.distance = math.sqrt(x**2 + y**2)

class RD03D:
    MULTI_TARGET_CMD = bytes([0xFD,0xFC,0xFB,0xFA,0x02,0x00,0x90,0x00,0x04,0x03,0x02,0x01])

    def __init__(self):
        try:
            self.ser = serial.Serial(RADAR_PORT, RADAR_BAUDRATE, timeout=0.1)
            self.buffer = b""
            self.targets = []
            time.sleep(3)
            self.ser.write(self.MULTI_TARGET_CMD)
            self.ser.flush()
            print(" Radar Initialized")
        except Exception as e:
            print(f" Radar Error: {e}")

    def update(self):
        if self.ser.in_waiting > 100:
            self.ser.reset_input_buffer()
            self.buffer = b""

        if self.ser.in_waiting:
            self.buffer += self.ser.read(self.ser.in_waiting)

        if len(self.buffer) > 400:
            self.buffer = self.buffer[-200:]

        start = self.buffer.rfind(b'\xAA\xFF')
        if start != -1 and len(self.buffer) >= start + 30:
            frame = self.buffer[start:start+30]
            if frame[-2:] == b'\x55\xCC':
                x = int.from_bytes(frame[5:7], "little", signed=True)
                y = int.from_bytes(frame[7:9], "little", signed=True)
                self.targets = [Target(x, y)]
                self.buffer = b""
                return True
        return False

    def get_target(self):
        return self.targets[0] if self.targets else None

# -------------------------------------------------
# SIM800L SMS
# -------------------------------------------------
def send_sms(text):
    try:
        ser = serial.Serial(GSM_PORT, GSM_BAUDRATE, timeout=1)
        ser.write(b"AT\r\n")
        time.sleep(0.3)
        ser.write(b"AT+CMGF=1\r\n")
        time.sleep(0.3)
        ser.write(f'AT+CMGS="{PHONE}"\r\n'.encode())
        time.sleep(0.3)
        ser.write((text + "\r\n").encode())
        ser.write(bytes([26]))
        time.sleep(3)
        ser.close()
        return True
    except Exception:
        return False

# -------------------------------------------------
# MAIN
# -------------------------------------------------
def main():
    print("[SYSTEM] Booting...")
    gps = GPSReader()
    radar = RD03D()

    cam = None
    for i in [0, 1]:
        c = cv2.VideoCapture(i)
        if c.isOpened():
            cam = c
            print(f" Camera found on index {i}")
            break

    print("[SYSTEM] Loading YOLO...")
    model = YOLO(YOLO_MODEL_PATH)

    last_sms = 0
    frame_count = 0

    print("[SYSTEM] SECURITY ACTIVE")

    try:
        while True:
            now = time.time()
            tstamp = datetime.now().strftime("%H:%M:%S")
            alert_sent = False

            # YOLO
            if cam:
                ret, frame = cam.read()
                if ret:
                    frame_count += 1
                    if frame_count % FRAME_SKIP == 0:
                        results = model(frame, verbose=False)
                        for r in results:
                            if r.boxes:
                                label = model.names[int(r.boxes.cls[0])]
                                if now - last_sms > SMS_COOLDOWN:
                                    coords = gps.get_coords()
                                    msg = f"[{tstamp}] AI ALERT: {label.upper()} @ {coords}"
                                    send_sms(msg)
                                    last_sms = now
                                    alert_sent = True
                                    print(f"[{tstamp}] YOLO: {label}")
                                break

            # RADAR
            if not alert_sent and radar.update():
                t = radar.get_target()
                if t and 0 < t.distance < DETECTION_RANGE_MM:
                    if now - last_sms > SMS_COOLDOWN:
                        dist_m = t.distance / 1000.0
                        coords = gps.get_coords()
                        msg = f"[{tstamp}] RADAR: HUMAN @ {dist_m:.2f}m, GPS: {coords}"
                        send_sms(msg)
                        last_sms = now
                        print(f"[{tstamp}] RADAR: {dist_m:.2f}m")

            time.sleep(0.05)

    except KeyboardInterrupt:
        print("\n[SYSTEM] Stopping...")
    finally:
        if cam:
            cam.release()

if __name__ == "__main__":
    main()

Credits

Hansblix Owuor
1 project • 1 follower
Nickson Kiprotich
15 projects • 52 followers
Iot, Robotics & TinyML Enthusiast

Comments