Mert Erbak
Published © Apache-2.0

Real-time Object Detection and Classification for Recycling

Ryzen AI-powered application that uses a camera to identify and classify various types of waste

AdvancedWork in progressOver 8 days187
Real-time Object Detection and Classification for Recycling

Things used in this project

Hardware components

Minisforum Venus UM790 Pro with AMD Ryzen™ 9
Minisforum Venus UM790 Pro with AMD Ryzen™ 9
×1

Software apps and online services

AMD Ryzen™ AI
DroidCam
For camera you can connect your phone as webcam

Story

Read more

Code

recycling_resnet.py

Python
Maskrcnn network
import os
import argparse
import torch
import torchvision
from torchvision.models.detection import maskrcnn_resnet50_fpn
import torchvision.transforms.functional as F
from pycocotools.coco import COCO
from PIL import Image
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import numpy as np
import torch.nn as nn
from torch.utils.data import DataLoader
from torch.optim.lr_scheduler import ReduceLROnPlateau
import random

def get_args():
    parser = argparse.ArgumentParser()
    parser.add_argument("-train", action='store_true')
    parser.add_argument("--num_epochs", type=int, default=10)
    parser.add_argument("--dataset_path", type=str, default="TACO")
    parser.add_argument("--batch_size", type=int, default=2)
    parser.add_argument("--learning_rate", type=float, default=0.00001)
    parser.add_argument("--split_number", type=int, default=0)
    args = parser.parse_args()
    return args

class TACODataset(torch.utils.data.Dataset):
    def __init__(self, root, transforms=None, split='train', split_number=0):
        self.root = root
        self.transforms = transforms
        self.split = split
        self.split_number = split_number
        
        annotation_file = os.path.join(root, f'annotations_{split_number}_{split}.json')
        if not os.path.exists(annotation_file):
            raise FileNotFoundError(f"Annotation file not found at {annotation_file}")
        
        self.coco = COCO(annotation_file)
        self.ids = list(self.coco.imgs.keys())

        self.cat_ids = self.coco.getCatIds()
        self.categories = {cat['id']: cat['name'] for cat in self.coco.loadCats(self.cat_ids)}

    def __getitem__(self, index):
        img_id = self.ids[index]
        ann_ids = self.coco.getAnnIds(imgIds=img_id)
        coco_annotation = self.coco.loadAnns(ann_ids)
        
        path = self.coco.loadImgs(img_id)[0]['file_name']
        img_path = os.path.join(self.root, path)

        if not os.path.exists(img_path):
            raise FileNotFoundError(f"Image file not found: {img_path}")

        img = Image.open(img_path).convert("RGB")
        
        num_objs = len(coco_annotation)
        if num_objs == 0:
            return None

        boxes = []
        masks = []
        labels = []
        
        img_width, img_height = img.size

        for i in range(num_objs):
            xmin = coco_annotation[i]['bbox'][0]
            ymin = coco_annotation[i]['bbox'][1]
            xmax = xmin + coco_annotation[i]['bbox'][2]
            ymax = ymin + coco_annotation[i]['bbox'][3]
            boxes.append([xmin, ymin, xmax, ymax])
            labels.append(coco_annotation[i]['category_id'])
            mask = self.coco.annToMask(coco_annotation[i])
            mask = Image.fromarray(mask).resize((img_width, img_height), Image.NEAREST)
            masks.append(np.array(mask))

        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        labels = torch.as_tensor(labels, dtype=torch.int64)
        masks = torch.as_tensor(np.array(masks), dtype=torch.uint8)
        image_id = torch.tensor([img_id])
        area = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0])
        iscrowd = torch.zeros((num_objs,), dtype=torch.int64)

        target = {}
        target["boxes"] = boxes
        target["labels"] = labels
        target["masks"] = masks
        target["image_id"] = image_id
        target["area"] = area
        target["iscrowd"] = iscrowd

        if self.transforms is not None:
            img, target = self.transforms(img, target)

        return img, target

    def __len__(self):
        return len(self.ids)

class Compose:
    def __init__(self, transforms):
        self.transforms = transforms

    def __call__(self, image, target):
        for t in self.transforms:
            image, target = t(image, target)
        return image, target

class ToTensor(object):
    def __call__(self, image, target):
        image = F.to_tensor(image)
        return image, target

class RandomHorizontalFlip(object):
    def __init__(self, prob):
        self.prob = prob

    def __call__(self, image, target):
        if random.random() < self.prob:
            height, width = image.shape[-2:]
            image = image.flip(-1)
            bbox = target["boxes"]
            bbox[:, [0, 2]] = width - bbox[:, [2, 0]]
            target["boxes"] = bbox
            if "masks" in target:
                target["masks"] = target["masks"].flip(-1)
        return image, target

class Resize(object):
    def __init__(self, size):
        self.size = size

    def __call__(self, image, target):
        image = F.resize(image, self.size)
        if "masks" in target:
            target["masks"] = F.resize(target["masks"], self.size)
        return image, target

def get_transform(train):
    transforms = []
    transforms.append(ToTensor())
    transforms.append(Resize((800, 800)))
    if train:
        transforms.append(RandomHorizontalFlip(0.5))
    return Compose(transforms)

def collate_fn(batch):
    batch = [b for b in batch if b is not None]
    return tuple(zip(*batch))

def init_weights(m):
    if isinstance(m, nn.Conv2d):
        nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')

def train_model(model, data_loader_train, data_loader_val, device, num_epochs, lr):
    model.to(device)
    params = [p for p in model.parameters() if p.requires_grad]
    optimizer = torch.optim.Adam(params, lr=lr)
    scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=3)

    for epoch in range(num_epochs):
        model.train()
        total_loss = 0
        for images, targets in data_loader_train:
            images = list(image.to(device) for image in images)
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

            # Debug: Print shapes of inputs and targets
            print(f"Images shape: {[img.shape for img in images]}")
            for target in targets:
                print(f"Target keys: {target.keys()}")
                for key, value in target.items():
                    print(f"Target {key} shape: {value.shape}")

            loss_dict = model(images, targets)

            if isinstance(loss_dict, dict):
                losses = sum(loss for loss in loss_dict.values())
            else:
                print("Loss dict is a list, check its content.")
                for i, loss in enumerate(loss_dict):
                    print(f"Loss {i} keys: {loss.keys()}")
                    for key, value in loss.items():
                        print(f"Loss {i} {key} shape: {value.shape}")
                losses = sum([sum(loss.values()) for loss in loss_dict])

            optimizer.zero_grad()
            losses.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            optimizer.step()
            total_loss += losses.item()

        avg_loss = total_loss / len(data_loader_train)
        print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {avg_loss:.4f}")

        model.eval()
        val_loss = 0
        with torch.no_grad():
            for images, targets in data_loader_val:
                images = list(image.to(device) for image in images)
                targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

                # Debug: Print shapes of inputs and targets
                print(f"Validation Images shape: {[img.shape for img in images]}")
                for target in targets:
                    print(f"Validation Target keys: {target.keys()}")
                    for key, value in target.items():
                        print(f"Validation Target {key} shape: {value.shape}")

                loss_dict = model(images, targets)

                if isinstance(loss_dict, dict):
                    losses = sum(loss for loss in loss_dict.values())
                else:
                    print("Loss dict is a list, check its content.")
                    for i, loss in enumerate(loss_dict):
                        print(f"Validation Loss {i} keys: {loss.keys()}")
                        for key, value in loss.items():
                            print(f"Validation Loss {i} {key} shape: {value.shape}")
                    losses = sum([sum(loss.values()) for loss in loss_dict])

                val_loss += losses.item()

        avg_val_loss = val_loss / len(data_loader_val)
        print(f"Epoch {epoch+1}/{num_epochs}, Validation Loss: {avg_val_loss:.4f}")
        scheduler.step(avg_val_loss)

        checkpoint = {
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'loss': avg_loss,
        }
        torch.save(checkpoint, f"model/maskrcnn_taco_checkpoint_epoch_{epoch+1}.pt")

    print("Training complete")
    torch.save(model.state_dict(), "model/maskrcnn_taco_litter_detection_final.pt")

def visualize_prediction(image, prediction, coco):
    fig, ax = plt.subplots(1, figsize=(12, 8))
    image = image.cpu().permute(1, 2, 0).numpy()
    ax.imshow(image)
    
    for box, label, score in zip(prediction['boxes'], prediction['labels'], prediction['scores']):
        if score > 0.5:
            box = box.cpu().numpy()
            rect = patches.Rectangle((box[0], box[1]), box[2]-box[0], box[3]-box[1], 
                                     linewidth=2, edgecolor='r', facecolor='none')
            ax.add_patch(rect)
            class_name = coco.loadCats(label.cpu().item())[0]['name']
            ax.text(box[0], box[1], f"{class_name}: {score:.2f}", color='white', 
                    bbox=dict(facecolor='red', alpha=0.5))
    
    plt.show()

def test_model(model, data_loader, device, coco):
    model.to(device)
    model.eval()
    
    for images, _ in data_loader:
        images = list(img.to(device) for img in images)
        
        with torch.no_grad():
            predictions = model(images)
        
        for img, prediction in zip(images, predictions):
            visualize_prediction(img, prediction, coco)

def main():
    args = get_args()
    
    device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
    
    dataset_train = TACODataset(root=os.path.join(args.dataset_path, 'data'),
                                transforms=get_transform(train=True),
                                split='train',
                                split_number=args.split_number)
    
    dataset_val = TACODataset(root=os.path.join(args.dataset_path, 'data'),
                              transforms=get_transform(train=False),
                              split='val',
                              split_number=args.split_number)
    
    dataset_test = TACODataset(root=os.path.join(args.dataset_path, 'data'),
                               transforms=get_transform(train=False),
                               split='test',
                               split_number=args.split_number)
    
    data_loader_train = DataLoader(dataset_train, batch_size=args.batch_size, shuffle=True, num_workers=4,
                                   collate_fn=collate_fn)
    
    data_loader_val = DataLoader(dataset_val, batch_size=1, shuffle=False, num_workers=4,
                                 collate_fn=collate_fn)
    
    data_loader_test = DataLoader(dataset_test, batch_size=1, shuffle=False, num_workers=4,
                                  collate_fn=collate_fn)
    
    coco = dataset_train.coco

    num_classes = len(dataset_train.categories) + 1  # +1 for background class

    if args.train:
        model = maskrcnn_resnet50_fpn(pretrained=True)
        in_features = model.roi_heads.box_predictor.cls_score.in_features
        model.roi_heads.box_predictor = torchvision.models.detection.faster_rcnn.FastRCNNPredictor(in_features, num_classes)
        model.roi_heads.mask_predictor = torchvision.models.detection.mask_rcnn.MaskRCNNPredictor(256, 256, num_classes)
        
        model.roi_heads.box_predictor.apply(init_weights)
        model.roi_heads.mask_predictor.apply(init_weights)
        
        print(f"Number of classes: {num_classes}")
        print(f"Model structure: {model}")
        
        train_model(model, data_loader_train, data_loader_val, device, args.num_epochs, args.learning_rate)
    else:
        model = maskrcnn_resnet50_fpn(num_classes=num_classes)
        model.load_state_dict(torch.load("model/maskrcnn_taco_litter_detection_final.pt"))
        test_model(model, data_loader_test, device, coco)

if __name__ == "__main__":
    main()

YOLO_train.py

Python
import os
HOME = os.getcwd()
print(HOME)
!pip install ultralytics==8.0.20

from IPython import display
display.clear_output()

import ultralytics
ultralytics.checks()
from ultralytics import YOLO
from IPython.display import display, Image


!mkdir {HOME}/datasets
%cd {HOME}/datasets

!pip install roboflow --quiet

from roboflow import Roboflow
rf = Roboflow(api_key="fR5rAAgUtxpvu1aEzWsQ")
project = rf.workspace("roboflow-universe-projects").project("taco-object-detection-kcxyn")
dataset = project.version(2).download("yolov8")

%cat {dataset.location}/data.yaml

%cd {HOME}

!yolo task=detect mode=train model=yolov8m.pt data={dataset.location}/data.yaml epochs=30 imgsz=640 plots=True

!ls {HOME}/runs/detect/train/

%cd {HOME}
Image(filename=f'{HOME}/runs/detect/train/confusion_matrix.png', width=600)

%cd {HOME}
Image(filename=f'{HOME}/runs/detect/train/results.png', width=600)

%cd {HOME}
Image(filename=f'{HOME}/runs/detect/train/val_batch0_pred.jpg', width=600)

%cd {HOME}

!yolo task=detect mode=val model={HOME}/runs/detect/train/weights/best.pt data={dataset.location}/data.yaml


%cd {HOME}
!yolo task=detect mode=predict model={HOME}/runs/detect/train/weights/best.pt conf=0.25 source={dataset.location}/test/images save=True


import glob
from IPython.display import Image, display

for image_path in glob.glob(f'{HOME}/runs/detect/predict/*.jpg')[:3]:
      display(Image(filename=image_path, width=600))
      print("\n")
%pwd
%ls

from ultralytics import YOLO

model = YOLO('runs/detect/train/weights/best.pt')

model.export(format='onnx')

recycling_quantize.py

Python
import argparse
import torch
import numpy as np
import onnx
import onnxruntime as ort
import vai_q_onnx
from onnxruntime.quantization import QuantFormat, QuantType, CalibrationDataReader
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
import cv2
from pathlib import Path
import os

# Define the paths
script_dir = os.path.dirname(os.path.abspath(__file__))
models_dir = os.path.join(script_dir, 'models')
onnx_model_path = os.path.join(models_dir, 'best.onnx')

class RecyclingDataset(Dataset):
    def __init__(self, data_dir, transform=None):
        self.data_dir = Path(data_dir)
        self.image_dir = self.data_dir / 'images'
        self.label_dir = self.data_dir / 'labels'
        self.transform = transform

        self.image_files = list(self.image_dir.glob('*.jpg'))  # Adjust file extension if needed
        print(f"Found {len(self.image_files)} images in {self.image_dir}")

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        img_path = str(self.image_files[idx])
        label_path = str(self.label_dir / (self.image_files[idx].stem + '.txt'))

        image = cv2.imread(img_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        
        if self.transform:
            image = self.transform(image)
        
        # Read labels
        labels = self.read_labels(label_path, img_path)
        
        return image, labels
    
    def read_labels(self, label_path, img_path):
        if os.path.exists(label_path):
            try:
                with open(label_path, 'r') as f:
                    lines = f.readlines()
                    labels = []
                    for line in lines:
                        parts = list(map(float, line.strip().split()))
                        if len(parts) >= 6:
                            class_id = int(parts[0])
                            polygon = np.array(parts[1:]).reshape(-1, 2)
                            x_min, y_min = polygon.min(axis=0)
                            x_max, y_max = polygon.max(axis=0)
                            x_center = (x_min + x_max) / 2
                            y_center = (y_min + y_max) / 2
                            width = x_max - x_min
                            height = y_max - y_min
                            labels.append([class_id, x_center, y_center, width, height])
                        else:
                            print(f"Warning: Invalid label format in file {label_path}")
                    labels = np.array(labels)
            except Exception as e:
                print(f"Error reading label file {label_path}: {str(e)}")
                labels = np.zeros((1, 5))  # Placeholder label
        else:
            print(f"Warning: Label file not found for {img_path}")
            labels = np.zeros((1, 5))  # Placeholder label
        
        return labels

def prepare_dataset(data_dir, batch_size=1, quantization=False):
    transform = transforms.Compose([
        transforms.ToPILImage(),
        transforms.Resize((640, 640)),  # Adjust size as needed
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])
    actual_batch_size = 1 if quantization else batch_size
    
    # Use a smaller subset of the validation data for calibration
    subset = 'valid' if quantization else 'train'
    
    dataset_path = os.path.join(data_dir, subset)
    print(f"Loading dataset from: {dataset_path}")
    
    dataset = RecyclingDataset(dataset_path, transform=transform)
    
    if len(dataset) == 0:
        raise ValueError(f"No images found in {os.path.join(dataset_path, 'images')}")
    
    dataloader = DataLoader(dataset, batch_size=actual_batch_size, shuffle=not quantization)
    
    print(f"Created dataloader with {len(dataset)} images")
    print(f"Image directory: {dataset.image_dir}")
    print(f"Label directory: {dataset.label_dir}")
    
    # Print first few file names
    print("First few image files:")
    for img_file in dataset.image_files[:5]:
        print(f"  {img_file}")
    
    return dataloader

class YOLOCalibrationDataReader(CalibrationDataReader):
    def __init__(self, data_loader):
        super().__init__()
        self.data_loader = data_loader
        self.iterator = iter(self.data_loader)

    def get_next(self) -> dict:
        try:
            images, _ = next(self.iterator)
            return {"images": images[0].unsqueeze(0).numpy()}
        except StopIteration:
            return None

def yolo_calibration_reader(data_loader):
    return YOLOCalibrationDataReader(data_loader)

def quantize(quantize_loader, model_name):
    print(f"Quantizing {model_name}...")
    onnx_model_path = os.path.join(models_dir, 'best.onnx')
    onnx_model = onnx.load(onnx_model_path)
    onnx.checker.check_model(onnx_model)
    
    input_model_path = onnx_model_path
    output_model_path = os.path.join(models_dir, f"{model_name}_recycling_detection.qdq.U8S8.onnx")
    
    data_reader = yolo_calibration_reader(quantize_loader)
    
    try:
        vai_q_onnx.quantize_static(
            input_model_path,
            output_model_path,
            data_reader,
            quant_format=QuantFormat.QDQ,
            calibrate_method=vai_q_onnx.PowerOfTwoMethod.MinMSE,
            activation_type=QuantType.QUInt8,
            weight_type=QuantType.QInt8,
            enable_ipu_cnn=True,
            extra_options={'ActivationSymmetric': True}
        )
        print(f"Quantized Model Saved at {output_model_path}")
    except Exception as e:
        print(f"Error during quantization: {str(e)}")
        # Add more debug information
        print("Model input shape:", onnx_model.graph.input[0].type.tensor_type.shape)
        print("First batch from data_reader:")
        first_batch = data_reader.get_next()
        if first_batch:
            print("Shape:", first_batch['images'].shape)
            print("Type:", first_batch['images'].dtype)
        else:
            print("No data from data_reader")

def load_quantized_model(model_path):
    session = ort.InferenceSession(model_path, providers=['CPUExecutionProvider'])
    return session

def get_args():
    parser = argparse.ArgumentParser()
    parser.add_argument("-model", type=str, default='yolo')
    parser.add_argument("--data_dir", type=str, required=True, help="Path to the dataset directory")
    args = parser.parse_args()
    return args

def main():
    args = get_args()
    
    print(f"Data directory: {args.data_dir}")
    print(f"Model name: {args.model}")
    
    try:
        print("Preparing quantization dataset...")
        quantize_loader = prepare_dataset(args.data_dir, quantization=True)
        print("Quantizing model...")
        quantize(quantize_loader, args.model)
    except Exception as e:
        print(f"An error occurred: {str(e)}")
        import traceback
        traceback.print_exc()

if __name__ == "__main__":
    main()

live_camera_recycling.py

Python
Live camera for yolo
import cv2
import numpy as np
import onnx
import onnxruntime as ort
from pathlib import Path

# Load the quantized YOLO model
quantized_model_path = r'C:\Users\merte\OneDrive\Desktop\Resnet-v2\models\quantized_model.onnx'
model = onnx.load(quantized_model_path)

# Set up ONNX Runtime session
providers = ['CPUExecutionProvider']
provider_options = [{}]
session = ort.InferenceSession(model.SerializeToString(), providers=providers,
                               provider_options=provider_options)

# Class names for TACO dataset (59 classes)
class_names = ['Aerosol', 'Aluminium blister pack', 'Aluminium foil', 'Battery', 'Broken glass', 
               'Carded blister pack', 'Cigarette', 'Clear plastic bottle', 'Corrugated carton', 
               'Crisp packet', 'Disposable food container', 'Disposable plastic cup', 'Drink can', 
               'Drink carton', 'Egg carton', 'Foam cup', 'Foam food container', 'Food Can', 
               'Food waste', 'Garbage bag', 'Glass bottle', 'Glass cup', 'Glass jar', 
               'Magazine paper', 'Meal carton', 'Metal bottle cap', 'Metal lid', 'Normal paper', 
               'Other carton', 'Other plastic bottle', 'Other plastic container', 'Other plastic cup', 
               'Other plastic wrapper', 'Other plastic', 'Paper bag', 'Paper cup', 'Paper straw', 
               'Pizza box', 'Plastic bottle cap', 'Plastic film', 'Plastic glooves', 'Plastic lid', 
               'Plastic straw', 'Plastic utensils', 'Polypropylene bag', 'Pop tab', 'Rope & strings', 
               'Scrap metal', 'Shoe', 'Single-use carrier bag', 'Six pack rings', 'Spread tub', 
               'Squeezable tube', 'Styrofoam piece', 'Tissues', 'Toilet tube', 'Tupperware', 
               'Unlabeled litter', 'Wrapping paper']

def preprocess_image(image, input_size=(640, 640)):
    original_height, original_width = image.shape[:2]
    
    # Resize and pad image
    ratio = min(input_size[0] / original_width, input_size[1] / original_height)
    new_size = (int(original_width * ratio), int(original_height * ratio))
    resized = cv2.resize(image, new_size, interpolation=cv2.INTER_LINEAR)
    
    padded = np.full((input_size[0], input_size[1], 3), 114, dtype=np.uint8)
    padded[:new_size[1], :new_size[0]] = resized
    
    # Normalize and change to CHW format
    padded = padded.astype(np.float32) / 255.0
    padded = padded.transpose(2, 0, 1)
    
    return np.expand_dims(padded, axis=0), (original_height, original_width), new_size
def postprocess(output, orig_shape, new_size, conf_threshold=0.5, iou_threshold=0.45):
    predictions = np.squeeze(output[0])
    
    # Assuming the output is in the format [x, y, w, h, conf, class_scores]
    num_classes = min(predictions.shape[1] - 5, len(class_names))
    
    # Apply sigmoid to confidence scores and class scores
    scores = 1 / (1 + np.exp(-predictions[:, 4]))
    class_scores = 1 / (1 + np.exp(-predictions[:, 5:5+num_classes]))
    
    # Get predicted classes
    class_ids = np.argmax(class_scores, axis=1)
    
    # Filter based on confidence threshold and valid class range
    mask = (scores > conf_threshold) & (class_ids < num_classes)
    boxes = predictions[mask, :4]
    scores = scores[mask]
    class_ids = class_ids[mask]
    
    # Convert boxes from [x, y, w, h] to [x1, y1, x2, y2]
    boxes[:, 2] = boxes[:, 0] + boxes[:, 2]
    boxes[:, 3] = boxes[:, 1] + boxes[:, 3]
    
    # Apply NMS
    indices = cv2.dnn.NMSBoxes(boxes, scores, conf_threshold, iou_threshold)
    
    if len(indices) > 0:
        indices = indices.flatten()
        boxes = boxes[indices]
        scores = scores[indices]
        class_ids = class_ids[indices]
        
        # Scale boxes back to original image
        input_height, input_width = new_size
        orig_height, orig_width = orig_shape
        
        scale = min(input_width / orig_width, input_height / orig_height)
        offset_x = (input_width - orig_width * scale) / 2
        offset_y = (input_height - orig_height * scale) / 2
        
        boxes[:, [0, 2]] = (boxes[:, [0, 2]] - offset_x) / scale
        boxes[:, [1, 3]] = (boxes[:, [1, 3]] - offset_y) / scale
        
        # Additional filtering based on box size (optional)
        valid_detections = (boxes[:, 2] - boxes[:, 0]) > 20 & (boxes[:, 3] - boxes[:, 1]) > 20
        boxes = boxes[valid_detections]
        scores = scores[valid_detections]
        class_ids = class_ids[valid_detections]
        
        return boxes, scores, class_ids
    else:
        return [], [], []

# Update the draw_detections function to include more information:
def draw_detections(frame, boxes, scores, class_ids):
    for box, score, class_id in zip(boxes, scores, class_ids):
        x1, y1, x2, y2 = box.astype(int)
        
        # Debug print
        print(f"class_id: {class_id}, score: {score:.4f}, box: {box}")
        
        # Check if class_id is valid
        if 0 <= class_id < len(class_names):
            label = f'{class_names[class_id]}: {score:.2f}'
        else:
            label = f'Unknown ({class_id}): {score:.2f}'
        
        cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
        
        # Draw the label background
        label_size, _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
        cv2.rectangle(frame, (x1, y1 - label_size[1] - 10), (x1 + label_size[0], y1), (0, 255, 0), -1)
        
        # Draw the label text
        cv2.putText(frame, label, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1)

def main():
    cap = cv2.VideoCapture(0)  # Open the default camera 
    
    while True:
        ret, frame = cap.read()
        if not ret:
            print("Failed to capture frame")
            break
    while True:    
        # Preprocess the frame
        input_data, orig_shape, new_size = preprocess_image(frame)
        
        # Run inference
        try:
            outputs = session.run(None, {'images': input_data})
            
            print(f"Model output shape: {outputs[0].shape}")
            print(f"Model output type: {outputs[0].dtype}")
            print(f"Model output min: {outputs[0].min()}, max: {outputs[0].max()}")
            
            boxes, scores, class_ids = postprocess(outputs[0], orig_shape, new_size)
            
            print(f"Number of detections: {len(boxes)}")
            print(f"Class IDs: {class_ids}")
            print(f"Scores: {scores}")
            print(f"Boxes: {boxes}")
            
            draw_detections(frame, boxes, scores, class_ids)
        
        except Exception as e:
            print(f"Error during inference or postprocessing: {e}")
              
        # Display the frame
        cv2.imshow('Recycling Classification', frame)
        
        # Break the loop if 'q' is pressed
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    
    # Release the capture and close windows
    cap.release()
    cv2.destroyAllWindows()

if __name__ == "__main__":
    main()

Credits

Mert Erbak
1 project • 0 followers
A forward-thinking and dynamic student with a deep-rooted passion for Artificial Intelligence, Finance, and future trends.
Thanks to babritb-bot.

Comments