LUcfarmer6
Created February 11, 2021 © MIT

Aerial Social Distancing Monitoring with Drones

As colleges and universities begin to reopen, monitoring social distancing during outdoor events is required.

IntermediateFull instructions providedOver 1 day157

Things used in this project

Hardware components

Turnigy 5000mAh 3S 40C Lipo Pack w/XT-90
×1
NXP KIT-RDDRONEK66;
×1
NXP 8MMNavQ.
×1

Software apps and online services

VS Code
Microsoft VS Code
PX4
PX4
PX4
PX4
QGroundControl
PX4 QGroundControl
QGroundControl
PX4 QGroundControl

Story

Read more

Code

Video Recording

Python
# Package Imports
import numpy as np
import cv2 as cv
import time
# SEtup Capture object
cap = cv.VideoCapture('v4l2src ! video/x-raw,width=640,height=480 ! decodebin ! videoconvert ! appsink', cv.CAP_GSTREAMER)

# Define the codec and create VideoWriter object
fourcc = cv.VideoWriter_fourcc(*'XVID')
out = cv.VideoWriter('flight.avi', fourcc, 15.0, (640,  480))

# Define Flight Time in seconds
dur = 60*1.5

# Start Recording and Timer
print("Starting recording")
start = time.time()
while cap.isOpened() and time.time()-start < dur:
    ret, frame = cap.read()
    if not ret:
        print("Can't receive frame (stream end?). Exiting ...")
        break
    out.write(frame)
cap.release()
out.release()

Train Retina Net

Python
The python file for training the RetinaNet model
# # Package Imports
import torch
import torchvision as tv
import pandas as pd
import os
import numpy as np
from torchvision import models, io, transforms, utils
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import time
import copy
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
from torchvision.models.detection.anchor_utils import AnchorGenerator
from torchvision.ops.feature_pyramid_network import LastLevelP6P7
from torchvision.models.detection.backbone_utils import resnet_fpn_backbone

# Determine if CUDA device is available
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

# Define Custom Dataset
class StanfordDroneDataset():
    # Create initialization object
    def __init__(self, transform, type, model_dir):
        #* Import CSV
        self.model_dir = model_dir
        # IMport Annotations and bounding boxes
        filename = self.model_dir+'model_data/'+type+'_annotations.csv'
        data_df = pd.read_csv(filename, header=None, names=['path', 'x1', 'y1', 'x2', 'y2', 'category'])
        # Import labels
        label_file = self.model_dir+'model_data/labels.csv'
        labels_df = pd.read_csv(label_file, header = None, names = ['category', 'label'])
        labels_df = labels_df.set_index('category')
        # Input the full paths
        full_paths = data_df['path'].to_list()
        full_boxes = torch.tensor(data_df.loc[:, 'x1':'y2'].values)
        labels_list = list(map(lambda x: labels_df.loc[x,:].values[0], data_df['category']))
        full_labels = torch.tensor(labels_list).to(torch.long)
        self.paths = []
        self.targets = []
        # Create a dictionary with the information required for training the retinanet
        for i in range(len(full_paths)-1):
            target = []
            boxes = []
            labels = []
            while (full_paths[i] == full_paths[i+1]):
                dictionary = {"boxes": full_boxes[i], "labels": full_labels[i]}
                target.append(dictionary)
                i+=1
                if (i >= (len(full_paths)-1)): 
                    break        
            if len(target)!=0:
                self.targets.append(target)
                self.paths.append(full_paths[i-1])
        self.transform = transform
    
    # Define getitem 
    def __getitem__(self, idx):
        # Define Full path
        img_path = self.model_dir+self.paths[idx]
        
        # Read image
        img_pil = Image.open(img_path).convert("RGB")
        # Transform Image
        img = self.transform(img_pil)
        _,h,w = img.shape
        target = self.targets[idx]
        return img, target
    # Get length of dataset
    def __len__(self):
        return len(self.paths)

# Define transformation that needs to be applied to model
normalize = transforms.Compose([
                                transforms.ToTensor(), 
                                ])

phases = ['train']
#* Import Datasets
dataset = {x: StanfordDroneDataset(normalize, x, r'/home/carson/research/drone/aerial_image_recognition/') for x in phases}
#* Create DataLoader
dataloader = {x: torch.utils.data.DataLoader(dataset[x], batch_size=1, shuffle = True) for x in phases}
#* Get dataset sizes
dataset_sizes = {x: len(dataset[x]) for x in phases}
print('done loading data')

# Setup Model
model= models.detection.retinanet_resnet50_fpn(num_classes=7, pretrained=False, pretrained_backbone=True)
#! Generate smaller anchors -- copied directly from model setup
anchor_sizes = tuple((x, int(x * 2 ** (1.0 / 3)), int(x * 2 ** (2.0 / 3))) for x in [16, 32, 64, 128, 256])
aspect_ratios = ((0.5, 1.0, 2.0),) * len(anchor_sizes)
anchor_generator = AnchorGenerator(anchor_sizes, aspect_ratios)
#! Update the anchor generator inside the model
model.anchor_generator = anchor_generator
model.backbone = resnet_fpn_backbone('resnet18', pretrained=True, returned_layers=[2, 3, 4], trainable_layers=0, extra_blocks=LastLevelP6P7(256, 256))
pytorch_total_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print("trainable parameters: ", pytorch_total_params)
print("MODEL LOADED")

# Create optimizer and learning rate scheduler
optimizer = optim.Adam(model.parameters(), lr=1e-3)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min')

# Define Hyperparameters
num_epochs = 10
batch_size = 1

###############3
# Add personal directory location for saving file
save_dir = ""

runnning_loss = []
phase = 'train'

# Begin Training model
for epoch in range(0,num_epochs):
    print('epoch {}/{}'.format(epoch, num_epochs - 1))
    print('-' * 10)
    set_loss = torch.Tensor([0.0])
    # Each epoch has a training and validation phase
    # for phase in ['train', 'val']:
    #     print("cur epoch ", epoch, "In phase: ", phase)
    #     if phase == 'train':
    model.train()
    print("for loop len: ", len(dataloader[phase]))
    i = 0
    tot_len = len(dataloader[phase])
    # Train the model on the dataset
    for images, targets in dataloader[phase]:
        i+=1
        print("iteration: ", i, "Percent Complete: ", i/tot_len*100)
        with torch.set_grad_enabled(phase == 'train'):
            if phase == 'train':
                output = model(images, targets)
                # print(output)
                class_loss = output["classification"]
                regression_loss = output["bbox_regression"]
                set_loss[0] += class_loss+regression_loss
                # optimize after completing each batch
                if i % batch_size == 0:
                    optimizer.zero_grad()
                    batch_loss = set_loss[0]
                    batch_loss.backward()
                    # Save model after batch for verbose logging
                    print("Saving model at iter: ", i, " of epoch: ", epoch, " with loss: ", set_loss.item())
                    torch.save(model.state_dict(), save_dir + "model_epoch_"+str(epoch)+"_iteration_"+str(i)+"_loss_"+str(set_loss[0].item())".zip")
                    optimizer.step()
                    scheduler.step(batch_loss)
                    set_loss = torch.Tensor([0.0])
    print('epoch loss: ', epoch_loss.item())
    running_loss.append(epoch_loss)
# Save the model after completing the training
torch.save(model.state_dict(), save_dir+"model_complete.zip")

Validation Jupyter Notebook

Python
This jupyter notebook is used for validation of the trained neural network
{
 "metadata": {
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.8.5-final"
  },
  "orig_nbformat": 2,
  "kernelspec": {
   "name": "python38664bit93e07db822b9475e8eac2c4d0dab897e",
   "display_name": "Python 3.8.6 64-bit",
   "language": "python"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2,
 "cells": [
  {
   "source": [
    "import torch\n",
    "import torchvision as tv\n",
    "import pandas as pd\n",
    "import os\n",
    "import cv2\n",
    "import numpy as np\n",
    "from torchvision import models, io, transforms, utils\n",
    "import matplotlib.pyplot as plt\n",
    "import matplotlib.patches as patches\n",
    "from torch.utils.data import Dataset, DataLoader\n",
    "from PIL import Image\n",
    "import time\n",
    "import copy\n",
    "import torch.nn as nn\n",
    "import torch.optim as optim\n",
    "from torch.optim import lr_scheduler\n",
    "import torchvision\n",
    "from torchvision.models.detection.anchor_utils import AnchorGenerator\n",
    "from torchvision.ops.feature_pyramid_network import LastLevelP6P7\n",
    "from torchvision.models.detection.backbone_utils import resnet_fpn_backbone\n",
    "from os import listdir\n",
    "from os.path import isfile, join"
   ],
   "cell_type": "code",
   "metadata": {},
   "execution_count": null,
   "outputs": []
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "device = torch.device('cpu')"
   ]
  },
  {
   "source": [
    "class StanfordDroneDataset():\n",
    "    #* type = ['test', 'train', 'val']\n",
    "    def __init__(self, transform, type, device):\n",
    "        #* Import CSV\n",
    "        self.device = device\n",
    "        # import labels\n",
    "        label_file = '/path_to_labels/labels.csv'\n",
    "        labels_df = pd.read_csv(label_file, header = None, names = ['category', 'label'])\n",
    "        labels_df = labels_df.set_index('category')\n",
    "        # Save transformation\n",
    "        self.transform = transform\n",
    "\n",
    "        # Load file paths\n",
    "        mypath = r'/path_to_personal_image_files/'\n",
    "        onlyfiles = [join(mypath, f) for f in listdir(mypath) if isfile(join(mypath, f))]\n",
    "        self.paths = onlyfiles\n",
    "       \n",
    "    def __getitem__(self, idx):\n",
    "\n",
    "        img_path = self.paths[idx]\n",
    "        img_pil = Image.open(img_path).convert(\"RGB\")\n",
    "        img = self.transform(img_pil).to(self.device)\n",
    "        \n",
    "        return img\n",
    "    \n",
    "    def __len__(self):\n",
    "        return len(self.paths)"
   ],
   "cell_type": "code",
   "metadata": {},
   "execution_count": null,
   "outputs": []
  },
  {
   "source": [
    "# Model Setup"
   ],
   "cell_type": "markdown",
   "metadata": {}
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "model= models.detection.retinanet_resnet50_fpn(num_classes=7, pretrained=False, pretrained_backbone=True)\n",
    "#! Generate smaller anchors -- copied directly from model setup\n",
    "anchor_sizes = tuple((x, int(x * 2 ** (1.0 / 3)), int(x * 2 ** (2.0 / 3))) for x in [16, 32, 64, 128, 256])\n",
    "aspect_ratios = ((0.5, 1.0, 2.0),) * len(anchor_sizes)\n",
    "anchor_generator = AnchorGenerator(anchor_sizes, aspect_ratios)\n",
    "#! Update the anchor generator inside the model\n",
    "model.anchor_generator = anchor_generator\n",
    "model.load_state_dict(torch.load('path_to_final_model'))\n",
    "model.to(device)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "normalize = transforms.Compose([\n",
    "                                transforms.ToTensor(), \n",
    "                                ])\n",
    "phases = ['personal']\n",
    "#* Import Datasets\n",
    "dataset = {x: StanfordDroneDataset(normalize, x, device) for x in phases}\n",
    "dataloader = {x: torch.utils.data.DataLoader(dataset[x], batch_size=1, shuffle = True) for x in phases}\n",
    "#* Get dataset sizes\n",
    "dataset_sizes = {x: len(dataset[x]) for x in phases}\n",
    "print('done loading data')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    " phase = 'personal'\n",
    " model.eval()\n",
    " for image in dataloader[phase]:\n",
    "        output = model(image)\n",
    "        fig, ax = plt.subplots(1)\n",
    "\n",
    "        ax.imshow(transforms.ToPILImage()(image[0]), interpolation=\"bicubic\")\n",
    "        output = output[0]\n",
    "        # Perform NMS on the bounding boxes to remove duplicates. \n",
    "        idxs = torchvision.ops.nms(output['boxes'], output['scores'], 0.01)\n",
    "        \n",
    "        boxes = output['boxes'][idxs]\n",
    "        labels = output['labels'][idxs]\n",
    "        scores = output['scores'][idxs]\n",
    "\n",
    "        mean_score = torch.mean(scores)\n",
    "        std_score = torch.std(scores)\n",
    "\n",
    "        for i in range(scores.shape[0]):\n",
    "            if labels[i] in [4,5]:\n",
    "                box = output['boxes'][i]\n",
    "                box = box.detach().numpy()\n",
    "                rect = patches.Rectangle((box[0], box[1]), box[2]-box[0], box[3]-box[1], linewidth = 2, edgecolor = 'r', fill = False)\n",
    "                ax.add_patch(rect)\n",
    "                ax.text(box[0], box[1], str(scores[i].detach().numpy()))\n",
    "\n",
    "        plt.show()"
   ]
  },
  {
   "source": [
    "GSD = (sensor_width \\* altitude \\* 100) / (focal_length \\* image_width)    \n",
    "focal_length = 2.5mm    \n",
    "fov = 84    \n",
    "pixel = 1.4 x 1.4 m   \n",
    "sensor_width = 0.003629m   \n",
    "image_width = 2582   \n",
    "GSD = 0.003629*15/2.5e-3/2582 = 0.008433m/pixel\n"
   ],
   "cell_type": "markdown",
   "metadata": {}
  }
 ]
}

Credits

LUcfarmer6

LUcfarmer6

1 project • 0 followers

Comments