Shinji
Published © GPL3+

Smart Bird Watching with RTSP, AI(YOLOv8) and Telegram Bot

Reconstructing a circular ecosystem: Building an AI bird watcher with YOLOv8 to track nature's 'phosphorus messengers'.

IntermediateFull instructions provided10 hours124
Smart Bird Watching with RTSP, AI(YOLOv8) and Telegram Bot

Things used in this project

Hardware components

Hiseeu, W-NVR, K8216-W6
×1

Software apps and online services

Debian
OpenCV
OpenCV
Telegram Bot
YOLOv8
VS Code
Microsoft VS Code

Story

Read more

Schematics

Schematic

Code

main_loop

Python
By isolating the reconnection strategy into a dedicated resilient loop, the code stays elegant while gaining the strength to recover autonomously.
    # Initialize resting variables
    last_perm_check = 0
    is_allowed = True

    # Main monitoring loop: Analyze frames at ~3s intervals
    while cap.isOpened():
        current_time = time.time()
        check_interval = PERM_CHECK_INTERVAL if is_allowed else SHEEP_COUNTING_INTERVAL
        
        # If is_allowed, nothing will be done before PERM_CHECK_INTERVAL
        if (current_time - last_perm_check) > check_interval:
            try:
                # Check permission after the selected "INTERVAL"
                with open(PERMISSION_FILE, 'r') as f:
                    perm = json.load(f)
                    new_status = perm.get("birdwatching", True)
                
                if new_status != is_allowed:
                    if not new_status:
                        send_telegram_text(f"High wind ({perm.get('wind_speed')}m/s). \n BirdWatcher is going to sleep (Zzz...)")
                    else:
                        send_telegram_text("Wind calmed down. \n BirdWatcher is waking up!")
                
                is_allowed = new_status
                last_perm_check = current_time
            except Exception as e:
                last_perm_check = current_time - (check_interval - 5)

        # If not is_allowed, count sheep and get back to the top of this loop
        if not is_allowed:
            time.sleep(SHEEP_COUNTING_INTERVAL)
            continue

        found_labels = set()
        boxes = None

        # Skip ~1s of frames to stay current with the live stream
        for _ in range(FRAME_SKIP):
            cap.grab()
            
        # Decode the latest frame before revision
        # ret, frame = cap.retrieve()
        # if not ret: break

        # Happy Path: Decode the latest frame
        ret, frame = cap.retrieve()

        if not ret:
            # Error Path: Implemented retry logic (10s interval) 
            # The wireless signal from a camera can be interfered with by microwave ovens.
            retry_count = 0
            max_retries = 30  # 10 x 30 = 300 [sec.] -> 5 [min.]        
            while retry_count < max_retries:
                retry_count += 1
                error_msg = f"Frame retrieval failed. Retrying... ({retry_count}/{max_retries})"
                print(error_msg)
            
                if retry_count == 1:
                    send_telegram_text("Signal disturbance detected. Initiating recovery...")
            
                cap.release()
                time.sleep(1) # Wait a second
                cap.open(RTSP_URL)
                time.sleep(9)  # Count to 10

                # Grab a few times to refresh buffer
                for _ in range(5):
                    cap.grab()
                
                ret, frame = cap.retrieve()
                if ret:
                    # Restored successfully
                    send_telegram_text(f"Connection restored after {retry_count} attempts.")
                    break # Back to Happy Path
        
        if not ret:
            # Give up and let systemd handle it
            send_telegram_text("Retries exhausted. Handing over to systemd.")
            print("Max retries reached. Exiting for systemd to take over.")
            time.sleep(30)
            continue # Exit main loop to trigger systemd restart

        # 0. Trim the ROI
        roi_frame = frame[ROI_Y1:ROI_Y2, ROI_X1:ROI_X2]
        current_roi_gray = cv2.cvtColor(roi_frame, cv2.COLOR_BGR2GRAY)
        current_roi_gray = cv2.GaussianBlur(current_roi_gray, (21, 21), 0)

        motion_detected = False
        if prev_roi_gray is not None:
            # Calculate difference from the previous frame
            frame_diff = cv2.absdiff(prev_roi_gray, current_roi_gray)
            _, thresh = cv2.threshold(frame_diff, DIFF_THRESHOLD, 255, cv2.THRESH_BINARY)
            
            # If there's any significant movement, set the flag
            diff_sum = np.sum(thresh)
            max_possible_diff = (ROI_X2 - ROI_X1) * (ROI_Y2 - ROI_Y1) * 255
            if MOTION_LOWER_LIMIT < diff_sum < (max_possible_diff * MOTION_UPPER_FACTOR):
                motion_detected = True
        
        # Update for the next loop
        prev_roi_gray = current_roi_gray.copy()

        # 1. Run inference with a broad confidence threshold (0.2)
        # Targets: Person(0), Bird(14), Cat(15), Dog(16)
        if motion_detected:
            results = model.predict(roi_frame, conf=INFERENCE_CONF, imgsz=1280, 
                                augment=True, classes=[0, 14, 15, 16], verbose=False)
            boxes = results[0].boxes
        
            # 2. Filter detections based on class-specific thresholds
            thresholds = {0: INFERENCE_CONF_PERSON, 
                          14: INFERENCE_CONF_BIRD, 
                          15: INFERENCE_CONF_CAT, 
                          16: INFERENCE_CONF_DOG}
            names = {0: "Person", 14: "Bird", 15: "Cat", 16: "Dog"}

            if boxes is not None:
                for box in boxes:
                    cls_id = int(box.cls[0])
                    conf = float(box.conf[0])
                    coords = box.xyxy[0].tolist()     # float coordinates
                    b_x1, b_y1, b_x2, b_y2 = map(int, coords) # integer coordinates
                    # Calculate bounding box area in pixels
                    area = (b_x2 - b_x1) * (b_y2 - b_y1)
                    # Convert coordinates from relative to global
                    gx1, gy1 = b_x1 + ROI_X1, b_y1 + ROI_Y1
                    gx2, gy2 = b_x2 + ROI_X1, b_y2 + ROI_Y1
                    # Get specific threshold for this class, defaulting to 0.5
                    target_threshold = thresholds.get(cls_id, 0.5)
                    if conf < target_threshold:
                        continue
                    # Map class ID to label name, with "Unknown" as a safety fallback
                    label_name = names.get(cls_id, "Unknown")
                    # -1. Filter out undersized 'Large' objects (e.g., wind-blown pots) ---
                    if label_name in ["Person", "Dog", "Cat"]:
                        if area < MIN_SIZE_LARGE_OBJ:
                        # Ignore small detections that are likely noise
                            continue
                    # -2. Filter out oversized 'Small' objects (e.g., large crows or misidentified cats) ---
                    if label_name == "Bird":
                        if area > MAX_SIZE_SMALL_BIRD:
                            # Only accept small-to-medium birds as "Bird"
                            continue            
                    # Draw bounding box
                    cv2.rectangle(frame, (gx1, gy1), (gx2, gy2), (0, 0, 255), 3)
                    # Register the validated label for Telegram notification
                    found_labels.add(label_name)

        # 3. Handle notifications based on detection state changes
        has_valid_target = len(found_labels) > 0

        if has_valid_target and not detected_previously:
            # Format message and save captured frame
            labels_str = ", ".join(found_labels)
            msg = f"Target confirmed: {labels_str} in the garden!"            
            photo_path = "detected_photo.jpg"
            cv2.imwrite(photo_path, frame)
            
            # Send notification and log to console
            send_telegram_photo(photo_path, msg)
            print(msg)
            
            detected_previously = True
        
        elif not has_valid_target:
            # Reset detection flag when targets leave the frame
            detected_previously = False
        
        time.sleep(max (0, LOOP_INTERVAL-1))

    cap.release()

Link_to_GitHub

AI Bird Watching python code with OpenCV and YOLOv8

Credits

Shinji
2 projects • 0 followers
Ph. D. in Engineering, Optics, 3D Holograms. Designer for High Power Laser Manufacturing Systems. Project Leader regarding Metal AM Monitor.

Comments